paul-rogers commented on code in PR #13196:
URL: https://github.com/apache/druid/pull/13196#discussion_r992832687


##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java:
##########
@@ -139,34 +260,52 @@ public synchronized Meta.Signature getSignature()
   public synchronized Meta.Frame nextFrame(final long fetchOffset, final int 
fetchMaxRowCount)
   {
     ensure(State.RUNNING, State.DONE);
-    Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] != 
offset [%,d]", fetchOffset, offset);
+    Preconditions.checkState(fetchOffset == nextFetchOffset, "fetchOffset 
[%,d] != offset [%,d]", fetchOffset, nextFetchOffset);
     if (state == State.DONE) {
-      return new Meta.Frame(fetchOffset, true, Collections.emptyList());
+      return new Meta.Frame(fetcher.offset(), true, Collections.emptyList());
     }
 
+    final Future<Meta.Frame> future;
+    if (fetchFuture == null) {
+      // Not waiting on a batch. Request one now.
+      fetcher.setBatchSize(fetchMaxRowCount);
+      future = queryExecutor.submit(fetcher);
+    } else {
+      // Last batch took too long. Continue waiting for it.
+      future = fetchFuture;
+      fetchFuture = null;
+    }
     try {
-      final List<Object> rows = new ArrayList<>();
-      while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < 
fetchOffset + fetchMaxRowCount)) {
-        rows.add(yielder.get());
-        yielder = yielder.next(null);
-        offset++;
-      }
-
-      if (yielder.isDone()) {
+      Meta.Frame result = future.get(fetcherFactory.fetchTimeoutMs(), 
TimeUnit.MILLISECONDS);

Review Comment:
   Great question! There is a tradeoff. Keeping the old code would be best done 
by having two variations of this class: the original one and this revised one. 
That would seem to increase the error surface by having more opportunities for 
bugs to creep in.
   
   It would seem that the new functionality is binary: it works or it doesn't. 
Users are obligated to use the Avatica client, so if we test that the 
zero-batch trick works for that client, we should be good.
   
   So, my thought is we just have the one implementation, and let tests tell is 
if anything broke. Let's revisit this decision if the tests tell us something 
is flaky and our assumptions are not valid. 



##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java:
##########
@@ -139,34 +260,52 @@ public synchronized Meta.Signature getSignature()
   public synchronized Meta.Frame nextFrame(final long fetchOffset, final int 
fetchMaxRowCount)
   {
     ensure(State.RUNNING, State.DONE);
-    Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] != 
offset [%,d]", fetchOffset, offset);
+    Preconditions.checkState(fetchOffset == nextFetchOffset, "fetchOffset 
[%,d] != offset [%,d]", fetchOffset, nextFetchOffset);
     if (state == State.DONE) {
-      return new Meta.Frame(fetchOffset, true, Collections.emptyList());
+      return new Meta.Frame(fetcher.offset(), true, Collections.emptyList());
     }
 
+    final Future<Meta.Frame> future;
+    if (fetchFuture == null) {
+      // Not waiting on a batch. Request one now.
+      fetcher.setBatchSize(fetchMaxRowCount);
+      future = queryExecutor.submit(fetcher);
+    } else {
+      // Last batch took too long. Continue waiting for it.
+      future = fetchFuture;
+      fetchFuture = null;
+    }
     try {
-      final List<Object> rows = new ArrayList<>();
-      while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < 
fetchOffset + fetchMaxRowCount)) {
-        rows.add(yielder.get());
-        yielder = yielder.next(null);
-        offset++;
-      }
-
-      if (yielder.isDone()) {
+      Meta.Frame result = future.get(fetcherFactory.fetchTimeoutMs(), 
TimeUnit.MILLISECONDS);
+      if (result.done) {
         state = State.DONE;
       }
-
-      return new Meta.Frame(fetchOffset, state == State.DONE, rows);
+      nextFetchOffset = fetcher.offset;
+      return result;
     }
-    catch (Throwable t) {
-      throw closeAndPropagateThrowable(t);
+    catch (CancellationException | InterruptedException e) {
+      // Consider this a failure.
+      throw closeAndPropagateThrowable(e);
+    }
+    catch (ExecutionException e) {
+      // Fetch threw an error. Unwrap it.
+      throw closeAndPropagateThrowable(e.getCause());
+    }
+    catch (TimeoutException e) {
+      fetchFuture = future;
+      // Wait timed out. Return 0 rows: the client will try again later.
+      // We'll wait again on this same fetch next time.
+      // Note that when the next fetch request comes, it will use the batch
+      // size set here: any change in size will be ignored for the in-flight 
batch.
+      // Changing batch size mid-query is an odd case: it will probably never 
happen.
+      return new Meta.Frame(nextFetchOffset, false, Collections.emptyList());

Review Comment:
   There is a new test that verifies that this is true. It sleeps for 5 seconds 
before fetching the first row. The fetch timeout is 1 second. And, sure enough, 
if we add some logging (then strip out the noise):
   
   ```text
   Timeout of batch at offset 0
   Timeout of batch at offset 0 
   Fetched batch at offset 0
   ```
   
   The first two log lines indicate that we returned a zero-row batch to the 
Avatica client which turned around and asked us again. Again we returned no 
rows, so the client asked a third time,  this time with feeling, "give me some 
rows!", and we obliged with the 6 rows from the test table. (Sadly, we don't 
have access to the batch row count, so it isn't logged above.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to