kfaraz commented on code in PR #13564:
URL: https://github.com/apache/druid/pull/13564#discussion_r1049170777
##########
core/src/main/java/org/apache/druid/query/QueryException.java:
##########
@@ -30,12 +29,126 @@
import java.util.function.Function;
/**
- * Base serializable error response
- *
+ * Base serializable error response.
+ * <p>
+ * The Object Model that QueryException follows is a little non-intuitive as
the primary way that a QueryException is
+ * generated is through a child class. However, those child classes are *not*
equivalent to a QueryException, instead
+ * they act as a Factory of QueryException objects. This can be seen in two
different places.
+ * <p>
+ * 1. When sanitize() is called, the response is a QueryException without any
indication of which original exception
+ * occurred.
+ * 2. When these objects get serialized across the wire the recipient
deserializes a QueryException. The client is
+ * never expected, and fundamentally is not allowed to, ever deserialize a
child class of QueryException.
+ * <p>
+ * For this reason, QueryException must contain all potential state that any
of its child classes could ever want to
+ * push across the wire. Additionally, any catch clauses expecting one of the
child Exceptions must know that it is
+ * running inside of code where the exception has not traveled across the
wire. If there is a chance that the
+ * exception could have been serialized across the wire, the code must catch a
QueryException and check the errorCode
+ * instead.
+ * <p>
+ * As a correlary, adding new state or adjusting the logic of this class must
always be done in a backwards-compatible
Review Comment:
Rather than trying to always maintain backwards compatibility with all the
child exceptions, do you think it would be better if we get rid of all the
child exceptions and always just throw a `QueryException` with the correct
error code?
##########
server/src/main/java/org/apache/druid/server/QueryResource.java:
##########
@@ -171,161 +167,112 @@ public Response cancelQuery(@PathParam("id") String
queryId, @Context final Http
@POST
@Produces({MediaType.APPLICATION_JSON,
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON,
SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE})
+ @Nullable
public Response doPost(
final InputStream in,
@QueryParam("pretty") final String pretty,
-
- // used to get request content-type,Accept header, remote address and
auth-related headers
@Context final HttpServletRequest req
) throws IOException
{
final QueryLifecycle queryLifecycle = queryLifecycleFactory.factorize();
- final ResourceIOReaderWriter ioReaderWriter =
createResourceIOReaderWriter(req, pretty != null);
+ final ResourceIOReaderWriter io = createResourceIOReaderWriter(req, pretty
!= null);
final String currThreadName = Thread.currentThread().getName();
try {
- final Query<?> query = readQuery(req, in, ioReaderWriter);
+ final Query<?> query;
+ try {
+ query = readQuery(req, in, io);
+ }
+ catch (QueryException e) {
+ return
io.getResponseWriter().buildNonOkResponse(e.getFailType().getExpectedStatus(),
e);
+ }
+
queryLifecycle.initialize(query);
- final String queryId = queryLifecycle.getQueryId();
final String queryThreadName = queryLifecycle.threadName(currThreadName);
Thread.currentThread().setName(queryThreadName);
if (log.isDebugEnabled()) {
log.debug("Got query [%s]", queryLifecycle.getQuery());
}
- final Access authResult = queryLifecycle.authorize(req);
+ final Access authResult;
+ try {
+ authResult = queryLifecycle.authorize(req);
+ }
+ catch (RuntimeException e) {
+ return io.getResponseWriter()
+ .buildNonOkResponse(
+ HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+ QueryInterruptedException.wrapIfNeeded(e)
+ );
+ }
if (!authResult.isAllowed()) {
throw new ForbiddenException(authResult.toString());
}
- final QueryResponse<?> queryResponse = queryLifecycle.execute();
- final Sequence<?> results = queryResponse.getResults();
- final ResponseContext responseContext =
queryResponse.getResponseContext();
- final String prevEtag = getPreviousEtag(req);
-
- if (prevEtag != null && prevEtag.equals(responseContext.getEntityTag()))
{
- queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), -1);
- successfulQueryCount.incrementAndGet();
- return Response.notModified().build();
- }
-
- final Yielder<?> yielder = Yielders.each(results);
+ // We use an async context not because we are actually going to run this
async, but because we want to delay
+ // the decision of what the response code should be until we have gotten
the first few data points to return.
+ // Returning a Response object from this point forward requires that
object to know the status code, which we
+ // don't actually know until we are in the accumulator, but if we try to
return a Response object from the
+ // accumulator, we cannot properly stream results back, because the
accumulator won't release control of the
+ // Response until it has consumed the underlying Sequence.
+ final AsyncContext asyncContext = req.startAsync();
try {
- final ObjectWriter jsonWriter =
queryLifecycle.newOutputWriter(ioReaderWriter);
-
- Response.ResponseBuilder responseBuilder = Response
- .ok(
- new StreamingOutput()
- {
- @Override
- public void write(OutputStream outputStream) throws
WebApplicationException
- {
- Exception e = null;
-
- CountingOutputStream os = new
CountingOutputStream(outputStream);
- try {
- // json serializer will always close the yielder
- jsonWriter.writeValue(os, yielder);
-
- os.flush(); // Some types of OutputStream suppress flush
errors in the .close() method.
- os.close();
- }
- catch (Exception ex) {
- e = ex;
- log.noStackTrace().error(ex, "Unable to send query
response.");
- throw new RuntimeException(ex);
- }
- finally {
- Thread.currentThread().setName(currThreadName);
-
- queryLifecycle.emitLogsAndMetrics(e,
req.getRemoteAddr(), os.getCount());
-
- if (e == null) {
- successfulQueryCount.incrementAndGet();
- } else {
- failedQueryCount.incrementAndGet();
- }
- }
- }
- },
- ioReaderWriter.getResponseWriter().getResponseType()
- )
- .header(QUERY_ID_RESPONSE_HEADER, queryId);
-
- attachResponseContextToHttpResponse(queryId, responseContext,
responseBuilder, jsonMapper,
- responseContextConfig, selfNode
- );
-
- return responseBuilder.build();
+ new QueryResourceResultPusher(req, queryLifecycle, io,
(HttpServletResponse) asyncContext.getResponse())
+ .push();
}
- catch (QueryException e) {
- // make sure to close yielder if anything happened before starting to
serialize the response.
- yielder.close();
+ finally {
+ asyncContext.complete();
+ }
+ }
+ catch (Exception e) {
+ if (e instanceof ForbiddenException && !req.isAsyncStarted()) {
Review Comment:
Wouldn't the `ForbiddenException` be caught already while doing the
`queryLifecycle.authorize()` above?
##########
server/src/main/java/org/apache/druid/server/QueryResource.java:
##########
@@ -565,4 +496,142 @@ public static void transferEntityTag(ResponseContext
context, Response.ResponseB
builder.header(HEADER_ETAG, entityTag);
}
}
+
+ private class QueryResourceQueryMetricCounter implements QueryMetricCounter
+ {
+ @Override
+ public void incrementSuccess()
+ {
+ successfulQueryCount.incrementAndGet();
Review Comment:
Nit: The different `AtomicLong`s for the counts can now be made member
fields of this class itself.
##########
sql/src/main/java/org/apache/druid/sql/http/SqlResource.java:
##########
@@ -320,4 +181,207 @@ public Response cancelQuery(
return Response.status(Status.FORBIDDEN).build();
}
}
+
+ /**
+ * The SqlResource only generates metrics and doesn't keep track of
aggregate counts of successful/failed/interrupted
+ * queries, so this implementation is effectively just a noop.
+ */
+ private static class SqlResourceQueryMetricCounter implements
QueryResource.QueryMetricCounter
+ {
+ @Override
+ public void incrementSuccess()
+ {
+
+ }
+
+ @Override
+ public void incrementFailed()
+ {
+
+ }
+
+ @Override
+ public void incrementInterrupted()
+ {
+
+ }
+
+ @Override
+ public void incrementTimedOut()
+ {
+
+ }
+ }
+
+ private class SqlResourceResultPusher extends ResultPusher
+ {
+ private final String sqlQueryId;
+ private final HttpStatement stmt;
+ private final SqlQuery sqlQuery;
+
+ public SqlResourceResultPusher(
+ AsyncContext asyncContext,
+ String sqlQueryId,
+ HttpStatement stmt,
+ SqlQuery sqlQuery
+ )
+ {
+ super(
+ (HttpServletResponse) asyncContext.getResponse(),
+ SqlResource.this.jsonMapper,
+ SqlResource.this.responseContextConfig,
+ SqlResource.this.selfNode,
+ SqlResource.QUERY_METRIC_COUNTER,
+ sqlQueryId,
+ MediaType.APPLICATION_JSON_TYPE
+ );
+ this.sqlQueryId = sqlQueryId;
+ this.stmt = stmt;
+ this.sqlQuery = sqlQuery;
+ }
+
+ @Override
+ public ResultsWriter start()
+ {
+ return new ResultsWriter()
+ {
+ private ResultSet thePlan;
+
+ @Override
+ @Nullable
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public QueryResponse<Object> start(HttpServletResponse response)
+ {
+ response.setHeader(SQL_QUERY_ID_RESPONSE_HEADER, sqlQueryId);
+
+ final QueryResponse<Object[]> retVal;
+ try {
+ thePlan = stmt.plan();
+ retVal = thePlan.run();
+ }
+ catch (RelOptPlanner.CannotPlanException e) {
+ recordFailure(e);
+ final SqlPlanningException wrappedException = new
SqlPlanningException(
+ SqlPlanningException.PlanningError.UNSUPPORTED_SQL_ERROR,
+ e.getMessage()
+ );
+
+ writeErrorResponse(HttpServletResponse.SC_BAD_REQUEST, response,
wrappedException);
+ return null;
+ }
+ // There is a claim that Calcite sometimes throws a
java.lang.AssertionError, but we do not have a test that can
+ // reproduce it checked into the code (the best we have is something
that uses mocks to throw an Error, which is
+ // dubious at best). We keep this just in case, but it might be
best to remove it and see where the
+ // AssertionErrors are coming from and do something to ensure that
they don't actually make it out of Calcite
+ catch (AssertionError e) {
+ log.warn(e, "AssertionError killed query: %s", sqlQuery);
+ final QueryInterruptedException wrappedEx =
QueryInterruptedException.wrapIfNeeded(e);
Review Comment:
Should this be a `QueryException` with `UNSUPPORTED_QUERY_ERROR_CODE`?
##########
server/src/main/java/org/apache/druid/server/QueryResource.java:
##########
@@ -171,161 +167,112 @@ public Response cancelQuery(@PathParam("id") String
queryId, @Context final Http
@POST
@Produces({MediaType.APPLICATION_JSON,
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON,
SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE})
+ @Nullable
public Response doPost(
final InputStream in,
@QueryParam("pretty") final String pretty,
-
- // used to get request content-type,Accept header, remote address and
auth-related headers
@Context final HttpServletRequest req
) throws IOException
{
final QueryLifecycle queryLifecycle = queryLifecycleFactory.factorize();
- final ResourceIOReaderWriter ioReaderWriter =
createResourceIOReaderWriter(req, pretty != null);
+ final ResourceIOReaderWriter io = createResourceIOReaderWriter(req, pretty
!= null);
final String currThreadName = Thread.currentThread().getName();
try {
- final Query<?> query = readQuery(req, in, ioReaderWriter);
+ final Query<?> query;
+ try {
+ query = readQuery(req, in, io);
+ }
+ catch (QueryException e) {
+ return
io.getResponseWriter().buildNonOkResponse(e.getFailType().getExpectedStatus(),
e);
+ }
+
queryLifecycle.initialize(query);
- final String queryId = queryLifecycle.getQueryId();
final String queryThreadName = queryLifecycle.threadName(currThreadName);
Thread.currentThread().setName(queryThreadName);
if (log.isDebugEnabled()) {
log.debug("Got query [%s]", queryLifecycle.getQuery());
}
- final Access authResult = queryLifecycle.authorize(req);
+ final Access authResult;
+ try {
+ authResult = queryLifecycle.authorize(req);
+ }
+ catch (RuntimeException e) {
+ return io.getResponseWriter()
+ .buildNonOkResponse(
+ HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+ QueryInterruptedException.wrapIfNeeded(e)
Review Comment:
Should this be a `QueryException` with `UNAUTHORIZED_ERROR_CODE` instead?
##########
core/src/main/java/org/apache/druid/query/QueryException.java:
##########
@@ -30,12 +29,126 @@
import java.util.function.Function;
/**
- * Base serializable error response
- *
+ * Base serializable error response.
+ * <p>
+ * The Object Model that QueryException follows is a little non-intuitive as
the primary way that a QueryException is
+ * generated is through a child class. However, those child classes are *not*
equivalent to a QueryException, instead
+ * they act as a Factory of QueryException objects. This can be seen in two
different places.
+ * <p>
+ * 1. When sanitize() is called, the response is a QueryException without any
indication of which original exception
+ * occurred.
+ * 2. When these objects get serialized across the wire the recipient
deserializes a QueryException. The client is
+ * never expected, and fundamentally is not allowed to, ever deserialize a
child class of QueryException.
+ * <p>
+ * For this reason, QueryException must contain all potential state that any
of its child classes could ever want to
+ * push across the wire. Additionally, any catch clauses expecting one of the
child Exceptions must know that it is
+ * running inside of code where the exception has not traveled across the
wire. If there is a chance that the
+ * exception could have been serialized across the wire, the code must catch a
QueryException and check the errorCode
+ * instead.
+ * <p>
+ * As a correlary, adding new state or adjusting the logic of this class must
always be done in a backwards-compatible
+ * fashion across all child classes of QueryException.
+ * <p>
+ * If there is any need to do different logic based on the type of error that
has happened, the only reliable method
+ * of discerning the type of the error is to look at the errorCode String.
Because these Strings are considered part
+ * of the API, they are not allowed to change and must maintain their same
semantics. The known errorCode Strings
+ * are pulled together as public static fields on this class in order to make
it more clear what options exist.
+ * <p>
* QueryResource and SqlResource are expected to emit the JSON form of this
object when errors happen.
*/
public class QueryException extends RuntimeException implements
SanitizableException
{
+ /**
+ * Error codes
Review Comment:
We could maybe capture the error codes into an enum. Each error code could
then be associated with an error message and a fail type. I don't think the
error code is persisted anywhere, so we should be okay with using an enum.
##########
core/src/main/java/org/apache/druid/query/QueryException.java:
##########
@@ -30,12 +29,126 @@
import java.util.function.Function;
/**
- * Base serializable error response
- *
+ * Base serializable error response.
+ * <p>
+ * The Object Model that QueryException follows is a little non-intuitive as
the primary way that a QueryException is
+ * generated is through a child class. However, those child classes are *not*
equivalent to a QueryException, instead
+ * they act as a Factory of QueryException objects. This can be seen in two
different places.
+ * <p>
+ * 1. When sanitize() is called, the response is a QueryException without any
indication of which original exception
+ * occurred.
+ * 2. When these objects get serialized across the wire the recipient
deserializes a QueryException. The client is
+ * never expected, and fundamentally is not allowed to, ever deserialize a
child class of QueryException.
+ * <p>
+ * For this reason, QueryException must contain all potential state that any
of its child classes could ever want to
+ * push across the wire. Additionally, any catch clauses expecting one of the
child Exceptions must know that it is
+ * running inside of code where the exception has not traveled across the
wire. If there is a chance that the
+ * exception could have been serialized across the wire, the code must catch a
QueryException and check the errorCode
+ * instead.
+ * <p>
+ * As a correlary, adding new state or adjusting the logic of this class must
always be done in a backwards-compatible
+ * fashion across all child classes of QueryException.
+ * <p>
+ * If there is any need to do different logic based on the type of error that
has happened, the only reliable method
+ * of discerning the type of the error is to look at the errorCode String.
Because these Strings are considered part
+ * of the API, they are not allowed to change and must maintain their same
semantics. The known errorCode Strings
+ * are pulled together as public static fields on this class in order to make
it more clear what options exist.
+ * <p>
* QueryResource and SqlResource are expected to emit the JSON form of this
object when errors happen.
*/
public class QueryException extends RuntimeException implements
SanitizableException
{
+ /**
+ * Error codes
+ */
+ public static final String JSON_PARSE_ERROR_CODE = "Json parse failed";
+ public static final String BAD_QUERY_CONTEXT_ERROR_CODE = "Query context
parse failed";
+ public static final String QUERY_CAPACITY_EXCEEDED_ERROR_CODE = "Query
capacity exceeded";
+ public static final String QUERY_INTERRUPTED_ERROR_CODE = "Query
interrupted";
+ // Note: the proper spelling is with a single "l", but the version with
Review Comment:
yeah, it's a UK vs US english thing 😅
##########
services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java:
##########
@@ -503,7 +506,7 @@ public ErrorResponseTransformStrategy
getErrorResponseTransformStrategy()
ArgumentCaptor<Exception> captor =
ArgumentCaptor.forClass(Exception.class);
Mockito.verify(mockMapper).writeValue(ArgumentMatchers.eq(outputStream),
captor.capture());
Assert.assertTrue(captor.getValue() instanceof QueryException);
- Assert.assertEquals(QueryInterruptedException.UNKNOWN_EXCEPTION,
((QueryException) captor.getValue()).getErrorCode());
+ Assert.assertEquals("Unknown exception", ((QueryException)
captor.getValue()).getErrorCode());
Review Comment:
Nit: use the UNKNOWN_ERROR_CODE instead.
##########
core/src/main/java/org/apache/druid/query/QueryException.java:
##########
@@ -30,12 +29,126 @@
import java.util.function.Function;
/**
- * Base serializable error response
- *
+ * Base serializable error response.
+ * <p>
+ * The Object Model that QueryException follows is a little non-intuitive as
the primary way that a QueryException is
+ * generated is through a child class. However, those child classes are *not*
equivalent to a QueryException, instead
+ * they act as a Factory of QueryException objects. This can be seen in two
different places.
+ * <p>
+ * 1. When sanitize() is called, the response is a QueryException without any
indication of which original exception
+ * occurred.
+ * 2. When these objects get serialized across the wire the recipient
deserializes a QueryException. The client is
+ * never expected, and fundamentally is not allowed to, ever deserialize a
child class of QueryException.
+ * <p>
+ * For this reason, QueryException must contain all potential state that any
of its child classes could ever want to
+ * push across the wire. Additionally, any catch clauses expecting one of the
child Exceptions must know that it is
+ * running inside of code where the exception has not traveled across the
wire. If there is a chance that the
+ * exception could have been serialized across the wire, the code must catch a
QueryException and check the errorCode
+ * instead.
+ * <p>
+ * As a correlary, adding new state or adjusting the logic of this class must
always be done in a backwards-compatible
+ * fashion across all child classes of QueryException.
+ * <p>
+ * If there is any need to do different logic based on the type of error that
has happened, the only reliable method
+ * of discerning the type of the error is to look at the errorCode String.
Because these Strings are considered part
+ * of the API, they are not allowed to change and must maintain their same
semantics. The known errorCode Strings
+ * are pulled together as public static fields on this class in order to make
it more clear what options exist.
+ * <p>
* QueryResource and SqlResource are expected to emit the JSON form of this
object when errors happen.
*/
public class QueryException extends RuntimeException implements
SanitizableException
{
+ /**
+ * Error codes
+ */
+ public static final String JSON_PARSE_ERROR_CODE = "Json parse failed";
+ public static final String BAD_QUERY_CONTEXT_ERROR_CODE = "Query context
parse failed";
+ public static final String QUERY_CAPACITY_EXCEEDED_ERROR_CODE = "Query
capacity exceeded";
+ public static final String QUERY_INTERRUPTED_ERROR_CODE = "Query
interrupted";
+ // Note: the proper spelling is with a single "l", but the version with
Review Comment:
Just realized that this comment has been copied over.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]