deniskuzZ commented on code in PR #4902: URL: https://github.com/apache/hive/pull/4902#discussion_r1615636668
########## jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java: ########## @@ -307,66 +345,220 @@ private void closeOperationHandle(TOperationHandle stmtHandle) throws SQLExcepti } } - /** - * Moves the cursor down one row from its current position. - * - * @see java.sql.ResultSet#next() - * @throws SQLException - * if a database access error occurs. - */ - public boolean next() throws SQLException { + ExecutorService pool = null; + volatile boolean hasStartRow = false; + int poolSize = 1; + int threadsStarted = 0; + + private class FetchResult { + Exception ex; + RowSet fetchedRows; + boolean hasMoreRows; + long startRow; + int numRows; + } + + BlockingQueue<FetchResult> resultQueue = new ArrayBlockingQueue<FetchResult>(4); + AtomicLong nextStartRow = new AtomicLong(1L); + + volatile InterruptedException interruptException = null; + volatile boolean gotLastBatch = false; + volatile boolean poolDone = false; + + private boolean nextRowBatch() throws SQLException { if (isClosed) { throw new SQLException("Resultset is closed"); } - if (emptyResultSet || (maxRows > 0 && rowsFetched >= maxRows)) { + if ((maxRows > 0 && rowsFetched >= maxRows) || emptyResultSet || fetchDone) { return false; } - - /* - * Poll on the operation status, till the operation is complete. - * We need to wait only for HiveStatement to complete. - * HiveDatabaseMetaData which also uses this ResultSet returns only after the RPC is complete. - */ - // when isHasResultSet is set, the query transitioned from running -> complete and is not expected go back to - // running state when fetching results (implicit state transition) - if ((statement instanceof HiveStatement) && (operationStatus == null || !operationStatus.isHasResultSet())) { - operationStatus = ((HiveStatement) statement).waitForOperationToComplete(); + if (check_operation_status) { + TGetOperationStatusResp operationStatus = + ((HiveStatement) statement).waitForOperationToComplete(); + check_operation_status = false; } - try { - TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT; - if (fetchFirst) { - // If we are asked to start from beginning, clear the current fetched resultset - orientation = TFetchOrientation.FETCH_FIRST; - fetchedRows = null; - fetchedRowsItr = null; - fetchFirst = false; - } - if (fetchedRows == null || !fetchedRowsItr.hasNext()) { +// TODO: Could support pool with maxRows by bounding results instead + if (rowsFetched < fetchSize || poolSize == 0 || maxRows > 0) { + try { + int fetchSizeBounded = fetchSize; + if (maxRows > 0 && rowsFetched + fetchSize > maxRows) { + fetchSizeBounded = maxRows - (int)rowsFetched; + } TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, - orientation, fetchSize); - LOG.debug("HiveQueryResultsFetchReq: {}", fetchReq); - TFetchResultsResp fetchResp; - fetchResp = client.FetchResults(fetchReq); + orientation, fetchSizeBounded); + TFetchResultsResp fetchResp = client.FetchResults(fetchReq); Utils.verifySuccessWithInfo(fetchResp.getStatus()); - TRowSet results = fetchResp.getResults(); + if (results.getStartRowOffset() > 0) { + hasStartRow = true; + } fetchedRows = RowSetFactory.create(results, protocol); - fetchedRowsItr = fetchedRows.iterator(); + fetchDone = !fetchResp.isHasMoreRows() && fetchedRows.numRows() == 0; + if (fetchDone) { + gotLastBatch = true; + } + fetchedRows = RowSetFactory.create(results, protocol); + if (useConvertedResultSet) { + fetchedRows = new ConvertedResultSet(fetchedRows, getSchema()); + } + + nextStartRow.set(results.getStartRowOffset() + 1 + fetchedRows.numRows()); + } catch (TException ex) { + ex.printStackTrace(); + throw new SQLException("Error retrieving next row", ex); + } + } else { + if (!gotLastBatch) { + if (pool == null) { + pool = Executors.newFixedThreadPool(poolSize); + } + // Add another thread on each row batch up to the limit + if(threadsStarted < (hasStartRow ? poolSize : 1)) { Review Comment: nit: space -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org