zachjsh commented on code in PR #13196:
URL: https://github.com/apache/druid/pull/13196#discussion_r991593162
##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java:
##########
@@ -139,34 +260,52 @@ public synchronized Meta.Signature getSignature()
public synchronized Meta.Frame nextFrame(final long fetchOffset, final int
fetchMaxRowCount)
{
ensure(State.RUNNING, State.DONE);
- Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] !=
offset [%,d]", fetchOffset, offset);
+ Preconditions.checkState(fetchOffset == nextFetchOffset, "fetchOffset
[%,d] != offset [%,d]", fetchOffset, nextFetchOffset);
if (state == State.DONE) {
- return new Meta.Frame(fetchOffset, true, Collections.emptyList());
+ return new Meta.Frame(fetcher.offset(), true, Collections.emptyList());
}
+ final Future<Meta.Frame> future;
+ if (fetchFuture == null) {
+ // Not waiting on a batch. Request one now.
+ fetcher.setBatchSize(fetchMaxRowCount);
+ future = queryExecutor.submit(fetcher);
+ } else {
+ // Last batch took too long. Continue waiting for it.
+ future = fetchFuture;
+ fetchFuture = null;
+ }
try {
- final List<Object> rows = new ArrayList<>();
- while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset <
fetchOffset + fetchMaxRowCount)) {
- rows.add(yielder.get());
- yielder = yielder.next(null);
- offset++;
- }
-
- if (yielder.isDone()) {
+ Meta.Frame result = future.get(fetcherFactory.fetchTimeoutMs(),
TimeUnit.MILLISECONDS);
+ if (result.done) {
state = State.DONE;
}
-
- return new Meta.Frame(fetchOffset, state == State.DONE, rows);
+ nextFetchOffset = fetcher.offset;
+ return result;
}
- catch (Throwable t) {
- throw closeAndPropagateThrowable(t);
+ catch (CancellationException | InterruptedException e) {
+ // Consider this a failure.
+ throw closeAndPropagateThrowable(e);
+ }
+ catch (ExecutionException e) {
+ // Fetch threw an error. Unwrap it.
+ throw closeAndPropagateThrowable(e.getCause());
+ }
+ catch (TimeoutException e) {
+ fetchFuture = future;
+ // Wait timed out. Return 0 rows: the client will try again later.
+ // We'll wait again on this same fetch next time.
+ // Note that when the next fetch request comes, it will use the batch
+ // size set here: any change in size will be ignored for the in-flight
batch.
+ // Changing batch size mid-query is an odd case: it will probably never
happen.
+ return new Meta.Frame(nextFetchOffset, false, Collections.emptyList());
Review Comment:
If we return 0 as batch size, does the avatica client automatically retry
until it sees EOF, or is the client at risk of thinking there is 0 rows of
data? I assume it waits for EOF, just wanted to double check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]