deniskuzZ commented on code in PR #4902:
URL: https://github.com/apache/hive/pull/4902#discussion_r1615640234


##########
jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java:
##########
@@ -307,66 +345,220 @@ private void closeOperationHandle(TOperationHandle 
stmtHandle) throws SQLExcepti
     }
   }
 
-  /**
-   * Moves the cursor down one row from its current position.
-   *
-   * @see java.sql.ResultSet#next()
-   * @throws SQLException
-   *           if a database access error occurs.
-   */
-  public boolean next() throws SQLException {
+  ExecutorService pool = null;
+  volatile boolean hasStartRow = false;
+  int poolSize = 1;
+  int threadsStarted = 0;
+
+  private class FetchResult {
+    Exception ex;
+    RowSet fetchedRows;
+    boolean hasMoreRows;
+    long startRow;
+    int numRows;
+  }
+
+  BlockingQueue<FetchResult> resultQueue = new 
ArrayBlockingQueue<FetchResult>(4);
+  AtomicLong nextStartRow = new AtomicLong(1L);
+
+  volatile InterruptedException interruptException = null;
+  volatile boolean gotLastBatch = false;
+  volatile boolean poolDone = false;
+
+  private boolean nextRowBatch() throws SQLException {
     if (isClosed) {
       throw new SQLException("Resultset is closed");
     }
-    if (emptyResultSet || (maxRows > 0 && rowsFetched >= maxRows)) {
+    if ((maxRows > 0 && rowsFetched >= maxRows) || emptyResultSet || 
fetchDone) {
       return false;
     }
-
-    /*
-     * Poll on the operation status, till the operation is complete.
-     * We need to wait only for HiveStatement to complete.
-     * HiveDatabaseMetaData which also uses this ResultSet returns only after 
the RPC is complete.
-     */
-    // when isHasResultSet is set, the query transitioned from running -> 
complete and is not expected go back to
-    // running state when fetching results (implicit state transition)
-    if ((statement instanceof HiveStatement) && (operationStatus == null || 
!operationStatus.isHasResultSet())) {
-      operationStatus = ((HiveStatement) 
statement).waitForOperationToComplete();
+    if (check_operation_status) {
+      TGetOperationStatusResp operationStatus =
+        ((HiveStatement) statement).waitForOperationToComplete();
+      check_operation_status = false;
     }
 
-    try {
-      TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT;
-      if (fetchFirst) {
-        // If we are asked to start from beginning, clear the current fetched 
resultset
-        orientation = TFetchOrientation.FETCH_FIRST;
-        fetchedRows = null;
-        fetchedRowsItr = null;
-        fetchFirst = false;
-      }
-      if (fetchedRows == null || !fetchedRowsItr.hasNext()) {
+// TODO: Could support pool with maxRows by bounding results instead
+    if (rowsFetched < fetchSize || poolSize == 0 || maxRows > 0) {
+      try {
+        int fetchSizeBounded = fetchSize;
+        if (maxRows > 0 && rowsFetched + fetchSize > maxRows) {
+          fetchSizeBounded = maxRows - (int)rowsFetched;
+        }
         TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle,
-            orientation, fetchSize);
-        LOG.debug("HiveQueryResultsFetchReq: {}", fetchReq);
-        TFetchResultsResp fetchResp;
-        fetchResp = client.FetchResults(fetchReq);
+            orientation, fetchSizeBounded);
+        TFetchResultsResp fetchResp = client.FetchResults(fetchReq);
         Utils.verifySuccessWithInfo(fetchResp.getStatus());
-
         TRowSet results = fetchResp.getResults();
+        if (results.getStartRowOffset() > 0) {
+          hasStartRow = true;
+        }
         fetchedRows = RowSetFactory.create(results, protocol);
-        fetchedRowsItr = fetchedRows.iterator();
+        fetchDone = !fetchResp.isHasMoreRows() && fetchedRows.numRows() == 0;
+        if (fetchDone) {
+          gotLastBatch = true;
+        }
+        fetchedRows = RowSetFactory.create(results, protocol);
+        if (useConvertedResultSet) {
+          fetchedRows = new ConvertedResultSet(fetchedRows, getSchema());
+        }
+
+        nextStartRow.set(results.getStartRowOffset() + 1 + 
fetchedRows.numRows());
+      } catch (TException ex) {
+        ex.printStackTrace();
+        throw new SQLException("Error retrieving next row", ex);
+      }
+    } else {
+      if (!gotLastBatch) {
+        if (pool == null) {
+          pool = Executors.newFixedThreadPool(poolSize);
+        }
+        // Add another thread on each row batch up to the limit
+        if(threadsStarted < (hasStartRow ? poolSize : 1)) {
+          final boolean useMainClient = (threadsStarted == 0);
+          threadsStarted++;
+          pool.execute(()->{
+            LOG.debug("Started thread {}", Thread.currentThread().getName());
+
+            final TCLIService.Iface fetchClient;
+            if (useMainClient) {
+              fetchClient = client;
+            } else {
+              try {
+                fetchClient = ((HiveConnection)connection).Clone().GetClient();
+              } catch (SQLException e) {
+                LOG.debug("Multi-stream connection error {}", e.toString());
+                return;
+              }
+            }
+
+            while (!gotLastBatch && !poolDone) {
+              FetchResult result = new FetchResult();
+              try {
+                final TFetchResultsReq fetchReq = new TFetchResultsReq(
+                    stmtHandle, orientation, fetchSize);
+                TFetchResultsResp fetchResp = 
fetchClient.FetchResults(fetchReq);
+                Utils.verifySuccessWithInfo(fetchResp.getStatus());
+                TRowSet results = fetchResp.getResults();
+                if (results.getStartRowOffset() > 0) {
+                  hasStartRow = true;
+                }
+                result.fetchedRows = RowSetFactory.create(results, protocol);
+                result.numRows = result.fetchedRows.numRows();
+                boolean hasMoreRows = result.numRows > 0 || 
fetchResp.isHasMoreRows();
+                if (!hasMoreRows) {
+                  gotLastBatch = true;
+                }
+                result.hasMoreRows = hasMoreRows;
+                result.fetchedRows = RowSetFactory.create(results, protocol);
+                if (useConvertedResultSet) {
+                  result.fetchedRows =
+                      new ConvertedResultSet(result.fetchedRows, getSchema());
+                }
+                result.startRow = results.getStartRowOffset() + 1;
+                if (hasStartRow && result.startRow < nextStartRow.get()) {
+                  throw new SQLException("Unexpected row offset");
+                }
+              } catch (Exception e) {
+                result.ex = e;
+              }
+
+              try {
+                // Wait for earlier row sets to be added to the queue
+                synchronized(nextStartRow) {
+                  if (!poolDone) {
+                    if (result.ex == null) {
+                      if (hasStartRow) {
+                        while (nextStartRow.get() != result.startRow) {
+                          nextStartRow.wait();
+                        }
+                        nextStartRow.set(result.startRow + result.numRows);
+                      }
+                      poolDone = !result.hasMoreRows;
+                    } else {
+                      poolDone = true;
+                    }
+                    resultQueue.put(result);
+                    if (hasStartRow) {
+                      nextStartRow.notifyAll();
+                    }
+                  }
+                }
+              } catch (InterruptedException e) {
+                interruptException = e;
+                break;
+              }
+            }
+            // TODO: shutdown called here can make execute return
+            // RejectedExecutionException
+            // pool.shutdownNow();
+          });
+        }
+      }
+      try {
+        if (interruptException != null) {
+          throw interruptException;

Review Comment:
   show we `shutdown` the pool?



##########
jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java:
##########
@@ -307,66 +345,220 @@ private void closeOperationHandle(TOperationHandle 
stmtHandle) throws SQLExcepti
     }
   }
 
-  /**
-   * Moves the cursor down one row from its current position.
-   *
-   * @see java.sql.ResultSet#next()
-   * @throws SQLException
-   *           if a database access error occurs.
-   */
-  public boolean next() throws SQLException {
+  ExecutorService pool = null;
+  volatile boolean hasStartRow = false;
+  int poolSize = 1;
+  int threadsStarted = 0;
+
+  private class FetchResult {
+    Exception ex;
+    RowSet fetchedRows;
+    boolean hasMoreRows;
+    long startRow;
+    int numRows;
+  }
+
+  BlockingQueue<FetchResult> resultQueue = new 
ArrayBlockingQueue<FetchResult>(4);
+  AtomicLong nextStartRow = new AtomicLong(1L);
+
+  volatile InterruptedException interruptException = null;
+  volatile boolean gotLastBatch = false;
+  volatile boolean poolDone = false;
+
+  private boolean nextRowBatch() throws SQLException {
     if (isClosed) {
       throw new SQLException("Resultset is closed");
     }
-    if (emptyResultSet || (maxRows > 0 && rowsFetched >= maxRows)) {
+    if ((maxRows > 0 && rowsFetched >= maxRows) || emptyResultSet || 
fetchDone) {
       return false;
     }
-
-    /*
-     * Poll on the operation status, till the operation is complete.
-     * We need to wait only for HiveStatement to complete.
-     * HiveDatabaseMetaData which also uses this ResultSet returns only after 
the RPC is complete.
-     */
-    // when isHasResultSet is set, the query transitioned from running -> 
complete and is not expected go back to
-    // running state when fetching results (implicit state transition)
-    if ((statement instanceof HiveStatement) && (operationStatus == null || 
!operationStatus.isHasResultSet())) {
-      operationStatus = ((HiveStatement) 
statement).waitForOperationToComplete();
+    if (check_operation_status) {
+      TGetOperationStatusResp operationStatus =
+        ((HiveStatement) statement).waitForOperationToComplete();
+      check_operation_status = false;
     }
 
-    try {
-      TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT;
-      if (fetchFirst) {
-        // If we are asked to start from beginning, clear the current fetched 
resultset
-        orientation = TFetchOrientation.FETCH_FIRST;
-        fetchedRows = null;
-        fetchedRowsItr = null;
-        fetchFirst = false;
-      }
-      if (fetchedRows == null || !fetchedRowsItr.hasNext()) {
+// TODO: Could support pool with maxRows by bounding results instead
+    if (rowsFetched < fetchSize || poolSize == 0 || maxRows > 0) {
+      try {
+        int fetchSizeBounded = fetchSize;
+        if (maxRows > 0 && rowsFetched + fetchSize > maxRows) {
+          fetchSizeBounded = maxRows - (int)rowsFetched;
+        }
         TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle,
-            orientation, fetchSize);
-        LOG.debug("HiveQueryResultsFetchReq: {}", fetchReq);
-        TFetchResultsResp fetchResp;
-        fetchResp = client.FetchResults(fetchReq);
+            orientation, fetchSizeBounded);
+        TFetchResultsResp fetchResp = client.FetchResults(fetchReq);
         Utils.verifySuccessWithInfo(fetchResp.getStatus());
-
         TRowSet results = fetchResp.getResults();
+        if (results.getStartRowOffset() > 0) {
+          hasStartRow = true;
+        }
         fetchedRows = RowSetFactory.create(results, protocol);
-        fetchedRowsItr = fetchedRows.iterator();
+        fetchDone = !fetchResp.isHasMoreRows() && fetchedRows.numRows() == 0;
+        if (fetchDone) {
+          gotLastBatch = true;
+        }
+        fetchedRows = RowSetFactory.create(results, protocol);
+        if (useConvertedResultSet) {
+          fetchedRows = new ConvertedResultSet(fetchedRows, getSchema());
+        }
+
+        nextStartRow.set(results.getStartRowOffset() + 1 + 
fetchedRows.numRows());
+      } catch (TException ex) {
+        ex.printStackTrace();
+        throw new SQLException("Error retrieving next row", ex);
+      }
+    } else {
+      if (!gotLastBatch) {
+        if (pool == null) {
+          pool = Executors.newFixedThreadPool(poolSize);
+        }
+        // Add another thread on each row batch up to the limit
+        if(threadsStarted < (hasStartRow ? poolSize : 1)) {
+          final boolean useMainClient = (threadsStarted == 0);
+          threadsStarted++;
+          pool.execute(()->{
+            LOG.debug("Started thread {}", Thread.currentThread().getName());
+
+            final TCLIService.Iface fetchClient;
+            if (useMainClient) {
+              fetchClient = client;
+            } else {
+              try {
+                fetchClient = ((HiveConnection)connection).Clone().GetClient();
+              } catch (SQLException e) {
+                LOG.debug("Multi-stream connection error {}", e.toString());
+                return;
+              }
+            }
+
+            while (!gotLastBatch && !poolDone) {
+              FetchResult result = new FetchResult();
+              try {
+                final TFetchResultsReq fetchReq = new TFetchResultsReq(
+                    stmtHandle, orientation, fetchSize);
+                TFetchResultsResp fetchResp = 
fetchClient.FetchResults(fetchReq);
+                Utils.verifySuccessWithInfo(fetchResp.getStatus());
+                TRowSet results = fetchResp.getResults();
+                if (results.getStartRowOffset() > 0) {
+                  hasStartRow = true;
+                }
+                result.fetchedRows = RowSetFactory.create(results, protocol);
+                result.numRows = result.fetchedRows.numRows();
+                boolean hasMoreRows = result.numRows > 0 || 
fetchResp.isHasMoreRows();
+                if (!hasMoreRows) {
+                  gotLastBatch = true;
+                }
+                result.hasMoreRows = hasMoreRows;
+                result.fetchedRows = RowSetFactory.create(results, protocol);
+                if (useConvertedResultSet) {
+                  result.fetchedRows =
+                      new ConvertedResultSet(result.fetchedRows, getSchema());
+                }
+                result.startRow = results.getStartRowOffset() + 1;
+                if (hasStartRow && result.startRow < nextStartRow.get()) {
+                  throw new SQLException("Unexpected row offset");
+                }
+              } catch (Exception e) {
+                result.ex = e;
+              }
+
+              try {
+                // Wait for earlier row sets to be added to the queue
+                synchronized(nextStartRow) {
+                  if (!poolDone) {
+                    if (result.ex == null) {
+                      if (hasStartRow) {
+                        while (nextStartRow.get() != result.startRow) {
+                          nextStartRow.wait();
+                        }
+                        nextStartRow.set(result.startRow + result.numRows);
+                      }
+                      poolDone = !result.hasMoreRows;
+                    } else {
+                      poolDone = true;
+                    }
+                    resultQueue.put(result);
+                    if (hasStartRow) {
+                      nextStartRow.notifyAll();
+                    }
+                  }
+                }
+              } catch (InterruptedException e) {
+                interruptException = e;
+                break;
+              }
+            }
+            // TODO: shutdown called here can make execute return
+            // RejectedExecutionException
+            // pool.shutdownNow();
+          });
+        }
+      }
+      try {
+        if (interruptException != null) {
+          throw interruptException;

Review Comment:
   should we `shutdown` the pool?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to