kfaraz commented on code in PR #13564:
URL: https://github.com/apache/druid/pull/13564#discussion_r1049170777


##########
core/src/main/java/org/apache/druid/query/QueryException.java:
##########
@@ -30,12 +29,126 @@
 import java.util.function.Function;
 
 /**
- * Base serializable error response
- *
+ * Base serializable error response.
+ * <p>
+ * The Object Model that QueryException follows is a little non-intuitive as 
the primary way that a QueryException is
+ * generated is through a child class.  However, those child classes are *not* 
equivalent to a QueryException, instead
+ * they act as a Factory of QueryException objects.  This can be seen in two 
different places.
+ * <p>
+ * 1. When sanitize() is called, the response is a QueryException without any 
indication of which original exception
+ * occurred.
+ * 2. When these objects get serialized across the wire the recipient 
deserializes a QueryException. The client is
+ * never expected, and fundamentally is not allowed to, ever deserialize a 
child class of QueryException.
+ * <p>
+ * For this reason, QueryException must contain all potential state that any 
of its child classes could ever want to
+ * push across the wire.  Additionally, any catch clauses expecting one of the 
child Exceptions must know that it is
+ * running inside of code where the exception has not traveled across the 
wire.  If there is a chance that the
+ * exception could have been serialized across the wire, the code must catch a 
QueryException and check the errorCode
+ * instead.
+ * <p>
+ * As a correlary, adding new state or adjusting the logic of this class must 
always be done in a backwards-compatible

Review Comment:
   Rather than trying to always maintain backwards compatibility with all the 
child exceptions, do you think it would be better if we get rid of all the 
child exceptions and always just throw a `QueryException` with the correct 
error code?



##########
server/src/main/java/org/apache/druid/server/QueryResource.java:
##########
@@ -171,161 +167,112 @@ public Response cancelQuery(@PathParam("id") String 
queryId, @Context final Http
   @POST
   @Produces({MediaType.APPLICATION_JSON, 
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
   @Consumes({MediaType.APPLICATION_JSON, 
SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE})
+  @Nullable
   public Response doPost(
       final InputStream in,
       @QueryParam("pretty") final String pretty,
-
-      // used to get request content-type,Accept header, remote address and 
auth-related headers
       @Context final HttpServletRequest req
   ) throws IOException
   {
     final QueryLifecycle queryLifecycle = queryLifecycleFactory.factorize();
 
-    final ResourceIOReaderWriter ioReaderWriter = 
createResourceIOReaderWriter(req, pretty != null);
+    final ResourceIOReaderWriter io = createResourceIOReaderWriter(req, pretty 
!= null);
 
     final String currThreadName = Thread.currentThread().getName();
     try {
-      final Query<?> query = readQuery(req, in, ioReaderWriter);
+      final Query<?> query;
+      try {
+        query = readQuery(req, in, io);
+      }
+      catch (QueryException e) {
+        return 
io.getResponseWriter().buildNonOkResponse(e.getFailType().getExpectedStatus(), 
e);
+      }
+
       queryLifecycle.initialize(query);
-      final String queryId = queryLifecycle.getQueryId();
       final String queryThreadName = queryLifecycle.threadName(currThreadName);
       Thread.currentThread().setName(queryThreadName);
 
       if (log.isDebugEnabled()) {
         log.debug("Got query [%s]", queryLifecycle.getQuery());
       }
 
-      final Access authResult = queryLifecycle.authorize(req);
+      final Access authResult;
+      try {
+        authResult = queryLifecycle.authorize(req);
+      }
+      catch (RuntimeException e) {
+        return io.getResponseWriter()
+                 .buildNonOkResponse(
+                     HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+                     QueryInterruptedException.wrapIfNeeded(e)
+                 );
+      }
       if (!authResult.isAllowed()) {
         throw new ForbiddenException(authResult.toString());
       }
 
-      final QueryResponse<?> queryResponse = queryLifecycle.execute();
-      final Sequence<?> results = queryResponse.getResults();
-      final ResponseContext responseContext = 
queryResponse.getResponseContext();
-      final String prevEtag = getPreviousEtag(req);
-
-      if (prevEtag != null && prevEtag.equals(responseContext.getEntityTag())) 
{
-        queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), -1);
-        successfulQueryCount.incrementAndGet();
-        return Response.notModified().build();
-      }
-
-      final Yielder<?> yielder = Yielders.each(results);
+      // We use an async context not because we are actually going to run this 
async, but because we want to delay
+      // the decision of what the response code should be until we have gotten 
the first few data points to return.
+      // Returning a Response object from this point forward requires that 
object to know the status code, which we
+      // don't actually know until we are in the accumulator, but if we try to 
return a Response object from the
+      // accumulator, we cannot properly stream results back, because the 
accumulator won't release control of the
+      // Response until it has consumed the underlying Sequence.
+      final AsyncContext asyncContext = req.startAsync();
 
       try {
-        final ObjectWriter jsonWriter = 
queryLifecycle.newOutputWriter(ioReaderWriter);
-
-        Response.ResponseBuilder responseBuilder = Response
-            .ok(
-                new StreamingOutput()
-                {
-                  @Override
-                  public void write(OutputStream outputStream) throws 
WebApplicationException
-                  {
-                    Exception e = null;
-
-                    CountingOutputStream os = new 
CountingOutputStream(outputStream);
-                    try {
-                      // json serializer will always close the yielder
-                      jsonWriter.writeValue(os, yielder);
-
-                      os.flush(); // Some types of OutputStream suppress flush 
errors in the .close() method.
-                      os.close();
-                    }
-                    catch (Exception ex) {
-                      e = ex;
-                      log.noStackTrace().error(ex, "Unable to send query 
response.");
-                      throw new RuntimeException(ex);
-                    }
-                    finally {
-                      Thread.currentThread().setName(currThreadName);
-
-                      queryLifecycle.emitLogsAndMetrics(e, 
req.getRemoteAddr(), os.getCount());
-
-                      if (e == null) {
-                        successfulQueryCount.incrementAndGet();
-                      } else {
-                        failedQueryCount.incrementAndGet();
-                      }
-                    }
-                  }
-                },
-                ioReaderWriter.getResponseWriter().getResponseType()
-            )
-            .header(QUERY_ID_RESPONSE_HEADER, queryId);
-
-        attachResponseContextToHttpResponse(queryId, responseContext, 
responseBuilder, jsonMapper,
-                                            responseContextConfig, selfNode
-        );
-
-        return responseBuilder.build();
+        new QueryResourceResultPusher(req, queryLifecycle, io, 
(HttpServletResponse) asyncContext.getResponse())
+            .push();
       }
-      catch (QueryException e) {
-        // make sure to close yielder if anything happened before starting to 
serialize the response.
-        yielder.close();
+      finally {
+        asyncContext.complete();
+      }
+    }
+    catch (Exception e) {
+      if (e instanceof ForbiddenException && !req.isAsyncStarted()) {

Review Comment:
   Wouldn't the `ForbiddenException` be caught already while doing the 
`queryLifecycle.authorize()` above?



##########
server/src/main/java/org/apache/druid/server/QueryResource.java:
##########
@@ -565,4 +496,142 @@ public static void transferEntityTag(ResponseContext 
context, Response.ResponseB
       builder.header(HEADER_ETAG, entityTag);
     }
   }
+
+  private class QueryResourceQueryMetricCounter implements QueryMetricCounter
+  {
+    @Override
+    public void incrementSuccess()
+    {
+      successfulQueryCount.incrementAndGet();

Review Comment:
   Nit: The different `AtomicLong`s for the counts can now be made member 
fields of this class itself.



##########
sql/src/main/java/org/apache/druid/sql/http/SqlResource.java:
##########
@@ -320,4 +181,207 @@ public Response cancelQuery(
       return Response.status(Status.FORBIDDEN).build();
     }
   }
+
+  /**
+   * The SqlResource only generates metrics and doesn't keep track of 
aggregate counts of successful/failed/interrupted
+   * queries, so this implementation is effectively just a noop.
+   */
+  private static class SqlResourceQueryMetricCounter implements 
QueryResource.QueryMetricCounter
+  {
+    @Override
+    public void incrementSuccess()
+    {
+
+    }
+
+    @Override
+    public void incrementFailed()
+    {
+
+    }
+
+    @Override
+    public void incrementInterrupted()
+    {
+
+    }
+
+    @Override
+    public void incrementTimedOut()
+    {
+
+    }
+  }
+
+  private class SqlResourceResultPusher extends ResultPusher
+  {
+    private final String sqlQueryId;
+    private final HttpStatement stmt;
+    private final SqlQuery sqlQuery;
+
+    public SqlResourceResultPusher(
+        AsyncContext asyncContext,
+        String sqlQueryId,
+        HttpStatement stmt,
+        SqlQuery sqlQuery
+    )
+    {
+      super(
+          (HttpServletResponse) asyncContext.getResponse(),
+          SqlResource.this.jsonMapper,
+          SqlResource.this.responseContextConfig,
+          SqlResource.this.selfNode,
+          SqlResource.QUERY_METRIC_COUNTER,
+          sqlQueryId,
+          MediaType.APPLICATION_JSON_TYPE
+      );
+      this.sqlQueryId = sqlQueryId;
+      this.stmt = stmt;
+      this.sqlQuery = sqlQuery;
+    }
+
+    @Override
+    public ResultsWriter start()
+    {
+      return new ResultsWriter()
+      {
+        private ResultSet thePlan;
+
+        @Override
+        @Nullable
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        public QueryResponse<Object> start(HttpServletResponse response)
+        {
+          response.setHeader(SQL_QUERY_ID_RESPONSE_HEADER, sqlQueryId);
+
+          final QueryResponse<Object[]> retVal;
+          try {
+            thePlan = stmt.plan();
+            retVal = thePlan.run();
+          }
+          catch (RelOptPlanner.CannotPlanException e) {
+            recordFailure(e);
+            final SqlPlanningException wrappedException = new 
SqlPlanningException(
+                SqlPlanningException.PlanningError.UNSUPPORTED_SQL_ERROR,
+                e.getMessage()
+            );
+
+            writeErrorResponse(HttpServletResponse.SC_BAD_REQUEST, response, 
wrappedException);
+            return null;
+          }
+          // There is a claim that Calcite sometimes throws a 
java.lang.AssertionError, but we do not have a test that can
+          // reproduce it checked into the code (the best we have is something 
that uses mocks to throw an Error, which is
+          // dubious at best).  We keep this just in case, but it might be 
best to remove it and see where the
+          // AssertionErrors are coming from and do something to ensure that 
they don't actually make it out of Calcite
+          catch (AssertionError e) {
+            log.warn(e, "AssertionError killed query: %s", sqlQuery);
+            final QueryInterruptedException wrappedEx = 
QueryInterruptedException.wrapIfNeeded(e);

Review Comment:
   Should this be a `QueryException` with `UNSUPPORTED_QUERY_ERROR_CODE`?



##########
server/src/main/java/org/apache/druid/server/QueryResource.java:
##########
@@ -171,161 +167,112 @@ public Response cancelQuery(@PathParam("id") String 
queryId, @Context final Http
   @POST
   @Produces({MediaType.APPLICATION_JSON, 
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
   @Consumes({MediaType.APPLICATION_JSON, 
SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE})
+  @Nullable
   public Response doPost(
       final InputStream in,
       @QueryParam("pretty") final String pretty,
-
-      // used to get request content-type,Accept header, remote address and 
auth-related headers
       @Context final HttpServletRequest req
   ) throws IOException
   {
     final QueryLifecycle queryLifecycle = queryLifecycleFactory.factorize();
 
-    final ResourceIOReaderWriter ioReaderWriter = 
createResourceIOReaderWriter(req, pretty != null);
+    final ResourceIOReaderWriter io = createResourceIOReaderWriter(req, pretty 
!= null);
 
     final String currThreadName = Thread.currentThread().getName();
     try {
-      final Query<?> query = readQuery(req, in, ioReaderWriter);
+      final Query<?> query;
+      try {
+        query = readQuery(req, in, io);
+      }
+      catch (QueryException e) {
+        return 
io.getResponseWriter().buildNonOkResponse(e.getFailType().getExpectedStatus(), 
e);
+      }
+
       queryLifecycle.initialize(query);
-      final String queryId = queryLifecycle.getQueryId();
       final String queryThreadName = queryLifecycle.threadName(currThreadName);
       Thread.currentThread().setName(queryThreadName);
 
       if (log.isDebugEnabled()) {
         log.debug("Got query [%s]", queryLifecycle.getQuery());
       }
 
-      final Access authResult = queryLifecycle.authorize(req);
+      final Access authResult;
+      try {
+        authResult = queryLifecycle.authorize(req);
+      }
+      catch (RuntimeException e) {
+        return io.getResponseWriter()
+                 .buildNonOkResponse(
+                     HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+                     QueryInterruptedException.wrapIfNeeded(e)

Review Comment:
   Should this be a `QueryException` with `UNAUTHORIZED_ERROR_CODE` instead?



##########
core/src/main/java/org/apache/druid/query/QueryException.java:
##########
@@ -30,12 +29,126 @@
 import java.util.function.Function;
 
 /**
- * Base serializable error response
- *
+ * Base serializable error response.
+ * <p>
+ * The Object Model that QueryException follows is a little non-intuitive as 
the primary way that a QueryException is
+ * generated is through a child class.  However, those child classes are *not* 
equivalent to a QueryException, instead
+ * they act as a Factory of QueryException objects.  This can be seen in two 
different places.
+ * <p>
+ * 1. When sanitize() is called, the response is a QueryException without any 
indication of which original exception
+ * occurred.
+ * 2. When these objects get serialized across the wire the recipient 
deserializes a QueryException. The client is
+ * never expected, and fundamentally is not allowed to, ever deserialize a 
child class of QueryException.
+ * <p>
+ * For this reason, QueryException must contain all potential state that any 
of its child classes could ever want to
+ * push across the wire.  Additionally, any catch clauses expecting one of the 
child Exceptions must know that it is
+ * running inside of code where the exception has not traveled across the 
wire.  If there is a chance that the
+ * exception could have been serialized across the wire, the code must catch a 
QueryException and check the errorCode
+ * instead.
+ * <p>
+ * As a correlary, adding new state or adjusting the logic of this class must 
always be done in a backwards-compatible
+ * fashion across all child classes of QueryException.
+ * <p>
+ * If there is any need to do different logic based on the type of error that 
has happened, the only reliable method
+ * of discerning the type of the error is to look at the errorCode String.  
Because these Strings are considered part
+ * of the API, they are not allowed to change and must maintain their same 
semantics.  The known errorCode Strings
+ * are pulled together as public static fields on this class in order to make 
it more clear what options exist.
+ * <p>
  * QueryResource and SqlResource are expected to emit the JSON form of this 
object when errors happen.
  */
 public class QueryException extends RuntimeException implements 
SanitizableException
 {
+  /**
+   * Error codes

Review Comment:
   We could maybe capture the error codes into an enum. Each error code could 
then be associated with an error message and a fail type. I don't think the 
error code is persisted anywhere, so we should be okay with using an enum.



##########
core/src/main/java/org/apache/druid/query/QueryException.java:
##########
@@ -30,12 +29,126 @@
 import java.util.function.Function;
 
 /**
- * Base serializable error response
- *
+ * Base serializable error response.
+ * <p>
+ * The Object Model that QueryException follows is a little non-intuitive as 
the primary way that a QueryException is
+ * generated is through a child class.  However, those child classes are *not* 
equivalent to a QueryException, instead
+ * they act as a Factory of QueryException objects.  This can be seen in two 
different places.
+ * <p>
+ * 1. When sanitize() is called, the response is a QueryException without any 
indication of which original exception
+ * occurred.
+ * 2. When these objects get serialized across the wire the recipient 
deserializes a QueryException. The client is
+ * never expected, and fundamentally is not allowed to, ever deserialize a 
child class of QueryException.
+ * <p>
+ * For this reason, QueryException must contain all potential state that any 
of its child classes could ever want to
+ * push across the wire.  Additionally, any catch clauses expecting one of the 
child Exceptions must know that it is
+ * running inside of code where the exception has not traveled across the 
wire.  If there is a chance that the
+ * exception could have been serialized across the wire, the code must catch a 
QueryException and check the errorCode
+ * instead.
+ * <p>
+ * As a correlary, adding new state or adjusting the logic of this class must 
always be done in a backwards-compatible
+ * fashion across all child classes of QueryException.
+ * <p>
+ * If there is any need to do different logic based on the type of error that 
has happened, the only reliable method
+ * of discerning the type of the error is to look at the errorCode String.  
Because these Strings are considered part
+ * of the API, they are not allowed to change and must maintain their same 
semantics.  The known errorCode Strings
+ * are pulled together as public static fields on this class in order to make 
it more clear what options exist.
+ * <p>
  * QueryResource and SqlResource are expected to emit the JSON form of this 
object when errors happen.
  */
 public class QueryException extends RuntimeException implements 
SanitizableException
 {
+  /**
+   * Error codes
+   */
+  public static final String JSON_PARSE_ERROR_CODE = "Json parse failed";
+  public static final String BAD_QUERY_CONTEXT_ERROR_CODE = "Query context 
parse failed";
+  public static final String QUERY_CAPACITY_EXCEEDED_ERROR_CODE = "Query 
capacity exceeded";
+  public static final String QUERY_INTERRUPTED_ERROR_CODE = "Query 
interrupted";
+  // Note: the proper spelling is with a single "l", but the version with

Review Comment:
   yeah, it's a UK vs US english thing 😅 



##########
services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java:
##########
@@ -503,7 +506,7 @@ public ErrorResponseTransformStrategy 
getErrorResponseTransformStrategy()
     ArgumentCaptor<Exception> captor = 
ArgumentCaptor.forClass(Exception.class);
     Mockito.verify(mockMapper).writeValue(ArgumentMatchers.eq(outputStream), 
captor.capture());
     Assert.assertTrue(captor.getValue() instanceof QueryException);
-    Assert.assertEquals(QueryInterruptedException.UNKNOWN_EXCEPTION, 
((QueryException) captor.getValue()).getErrorCode());
+    Assert.assertEquals("Unknown exception", ((QueryException) 
captor.getValue()).getErrorCode());

Review Comment:
   Nit: use the UNKNOWN_ERROR_CODE instead.



##########
core/src/main/java/org/apache/druid/query/QueryException.java:
##########
@@ -30,12 +29,126 @@
 import java.util.function.Function;
 
 /**
- * Base serializable error response
- *
+ * Base serializable error response.
+ * <p>
+ * The Object Model that QueryException follows is a little non-intuitive as 
the primary way that a QueryException is
+ * generated is through a child class.  However, those child classes are *not* 
equivalent to a QueryException, instead
+ * they act as a Factory of QueryException objects.  This can be seen in two 
different places.
+ * <p>
+ * 1. When sanitize() is called, the response is a QueryException without any 
indication of which original exception
+ * occurred.
+ * 2. When these objects get serialized across the wire the recipient 
deserializes a QueryException. The client is
+ * never expected, and fundamentally is not allowed to, ever deserialize a 
child class of QueryException.
+ * <p>
+ * For this reason, QueryException must contain all potential state that any 
of its child classes could ever want to
+ * push across the wire.  Additionally, any catch clauses expecting one of the 
child Exceptions must know that it is
+ * running inside of code where the exception has not traveled across the 
wire.  If there is a chance that the
+ * exception could have been serialized across the wire, the code must catch a 
QueryException and check the errorCode
+ * instead.
+ * <p>
+ * As a correlary, adding new state or adjusting the logic of this class must 
always be done in a backwards-compatible
+ * fashion across all child classes of QueryException.
+ * <p>
+ * If there is any need to do different logic based on the type of error that 
has happened, the only reliable method
+ * of discerning the type of the error is to look at the errorCode String.  
Because these Strings are considered part
+ * of the API, they are not allowed to change and must maintain their same 
semantics.  The known errorCode Strings
+ * are pulled together as public static fields on this class in order to make 
it more clear what options exist.
+ * <p>
  * QueryResource and SqlResource are expected to emit the JSON form of this 
object when errors happen.
  */
 public class QueryException extends RuntimeException implements 
SanitizableException
 {
+  /**
+   * Error codes
+   */
+  public static final String JSON_PARSE_ERROR_CODE = "Json parse failed";
+  public static final String BAD_QUERY_CONTEXT_ERROR_CODE = "Query context 
parse failed";
+  public static final String QUERY_CAPACITY_EXCEEDED_ERROR_CODE = "Query 
capacity exceeded";
+  public static final String QUERY_INTERRUPTED_ERROR_CODE = "Query 
interrupted";
+  // Note: the proper spelling is with a single "l", but the version with

Review Comment:
   Just realized that this comment has been copied over.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to