imply-cheddar commented on code in PR #13564:
URL: https://github.com/apache/druid/pull/13564#discussion_r1049209685
##########
server/src/main/java/org/apache/druid/server/QueryResource.java:
##########
@@ -171,161 +167,112 @@ public Response cancelQuery(@PathParam("id") String
queryId, @Context final Http
@POST
@Produces({MediaType.APPLICATION_JSON,
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON,
SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE})
+ @Nullable
public Response doPost(
final InputStream in,
@QueryParam("pretty") final String pretty,
-
- // used to get request content-type,Accept header, remote address and
auth-related headers
@Context final HttpServletRequest req
) throws IOException
{
final QueryLifecycle queryLifecycle = queryLifecycleFactory.factorize();
- final ResourceIOReaderWriter ioReaderWriter =
createResourceIOReaderWriter(req, pretty != null);
+ final ResourceIOReaderWriter io = createResourceIOReaderWriter(req, pretty
!= null);
final String currThreadName = Thread.currentThread().getName();
try {
- final Query<?> query = readQuery(req, in, ioReaderWriter);
+ final Query<?> query;
+ try {
+ query = readQuery(req, in, io);
+ }
+ catch (QueryException e) {
+ return
io.getResponseWriter().buildNonOkResponse(e.getFailType().getExpectedStatus(),
e);
+ }
+
queryLifecycle.initialize(query);
- final String queryId = queryLifecycle.getQueryId();
final String queryThreadName = queryLifecycle.threadName(currThreadName);
Thread.currentThread().setName(queryThreadName);
if (log.isDebugEnabled()) {
log.debug("Got query [%s]", queryLifecycle.getQuery());
}
- final Access authResult = queryLifecycle.authorize(req);
+ final Access authResult;
+ try {
+ authResult = queryLifecycle.authorize(req);
+ }
+ catch (RuntimeException e) {
+ return io.getResponseWriter()
+ .buildNonOkResponse(
+ HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+ QueryInterruptedException.wrapIfNeeded(e)
+ );
+ }
if (!authResult.isAllowed()) {
throw new ForbiddenException(authResult.toString());
}
- final QueryResponse<?> queryResponse = queryLifecycle.execute();
- final Sequence<?> results = queryResponse.getResults();
- final ResponseContext responseContext =
queryResponse.getResponseContext();
- final String prevEtag = getPreviousEtag(req);
-
- if (prevEtag != null && prevEtag.equals(responseContext.getEntityTag()))
{
- queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), -1);
- successfulQueryCount.incrementAndGet();
- return Response.notModified().build();
- }
-
- final Yielder<?> yielder = Yielders.each(results);
+ // We use an async context not because we are actually going to run this
async, but because we want to delay
+ // the decision of what the response code should be until we have gotten
the first few data points to return.
+ // Returning a Response object from this point forward requires that
object to know the status code, which we
+ // don't actually know until we are in the accumulator, but if we try to
return a Response object from the
+ // accumulator, we cannot properly stream results back, because the
accumulator won't release control of the
+ // Response until it has consumed the underlying Sequence.
+ final AsyncContext asyncContext = req.startAsync();
try {
- final ObjectWriter jsonWriter =
queryLifecycle.newOutputWriter(ioReaderWriter);
-
- Response.ResponseBuilder responseBuilder = Response
- .ok(
- new StreamingOutput()
- {
- @Override
- public void write(OutputStream outputStream) throws
WebApplicationException
- {
- Exception e = null;
-
- CountingOutputStream os = new
CountingOutputStream(outputStream);
- try {
- // json serializer will always close the yielder
- jsonWriter.writeValue(os, yielder);
-
- os.flush(); // Some types of OutputStream suppress flush
errors in the .close() method.
- os.close();
- }
- catch (Exception ex) {
- e = ex;
- log.noStackTrace().error(ex, "Unable to send query
response.");
- throw new RuntimeException(ex);
- }
- finally {
- Thread.currentThread().setName(currThreadName);
-
- queryLifecycle.emitLogsAndMetrics(e,
req.getRemoteAddr(), os.getCount());
-
- if (e == null) {
- successfulQueryCount.incrementAndGet();
- } else {
- failedQueryCount.incrementAndGet();
- }
- }
- }
- },
- ioReaderWriter.getResponseWriter().getResponseType()
- )
- .header(QUERY_ID_RESPONSE_HEADER, queryId);
-
- attachResponseContextToHttpResponse(queryId, responseContext,
responseBuilder, jsonMapper,
- responseContextConfig, selfNode
- );
-
- return responseBuilder.build();
+ new QueryResourceResultPusher(req, queryLifecycle, io,
(HttpServletResponse) asyncContext.getResponse())
+ .push();
}
- catch (QueryException e) {
- // make sure to close yielder if anything happened before starting to
serialize the response.
- yielder.close();
+ finally {
+ asyncContext.complete();
+ }
+ }
+ catch (Exception e) {
+ if (e instanceof ForbiddenException && !req.isAsyncStarted()) {
Review Comment:
that's throwing the exception, not catching it. Also, that's throwing it
from inside of the try, this catch is of generic Exception, which will catch
the exception thrown from inside of its own try. This is basically ensuring
that we pass it through. It's ugly, I agree, but ultimately the mismatch is
that we expect all exceptions to be handled and converted into responses here,
except for this one exception that we expect to make it out of the method. We
should really refactor it so that we serialize and handle the
ForbiddenException in this code as well, which will make it all consistent.
That is, instead of throwing the ForbbidenException, the code above could just
create the proper Response object and return it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]