kfaraz commented on code in PR #13564: URL: https://github.com/apache/druid/pull/13564#discussion_r1049294317
########## server/src/main/java/org/apache/druid/server/ResultPusher.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.QueryException; +import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.server.security.ForbiddenException; + +import javax.annotation.Nullable; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.MediaType; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; + +public abstract class ResultPusher Review Comment: Maybe rename this to `QueryResultPusher` as it is specific to queries. ########## server/src/main/java/org/apache/druid/server/ResultPusher.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.QueryException; +import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.server.security.ForbiddenException; + +import javax.annotation.Nullable; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.MediaType; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; + +public abstract class ResultPusher +{ + private static final Logger log = new Logger(ResultPusher.class); + + private final HttpServletResponse response; + private final String queryId; + private final ObjectMapper jsonMapper; + private final ResponseContextConfig responseContextConfig; + private final DruidNode selfNode; + private final QueryResource.QueryMetricCounter counter; + private final MediaType contentType; + + private StreamingHttpResponseAccumulator accumulator = null; + + public ResultPusher( + HttpServletResponse response, + ObjectMapper jsonMapper, + ResponseContextConfig responseContextConfig, + DruidNode selfNode, + QueryResource.QueryMetricCounter counter, + String queryId, + MediaType contentType + ) + { + this.response = response; + this.queryId = queryId; + this.jsonMapper = jsonMapper; + this.responseContextConfig = responseContextConfig; + this.selfNode = selfNode; + this.counter = counter; + this.contentType = contentType; + } + + /** + * Starts a ResultsWriter, which encapsulates logic to run the query, serialize it and also report success/failure Review Comment: IIUC, this method really starts the `ResultPusher`. The `ResultsWriter` is started (and executed) when calling its own `start` as `resultsWriter.start()`. So I guess a rephrase might be better: ```suggestion * Starts the ResultPusher and returns a new ResultsWriter, which encapsulates logic to run the query, serialize it and also report success/failure ``` ########## sql/src/main/java/org/apache/druid/sql/http/SqlResource.java: ########## @@ -275,7 +275,10 @@ public QueryResponse<Object> start(HttpServletResponse response) // AssertionErrors are coming from and do something to ensure that they don't actually make it out of Calcite catch (AssertionError e) { log.warn(e, "AssertionError killed query: %s", sqlQuery); - final QueryInterruptedException wrappedEx = QueryInterruptedException.wrapIfNeeded(e); + + // We wrap the exception here so that we get the sanitization. java.lang.AssertionError apparently + // doesn't implement org.apache.druid.common.exception.SanitizableException. Review Comment: 😂 ########## sql/src/main/java/org/apache/druid/sql/http/SqlResource.java: ########## @@ -320,4 +181,210 @@ public Response cancelQuery( return Response.status(Status.FORBIDDEN).build(); } } + + /** + * The SqlResource only generates metrics and doesn't keep track of aggregate counts of successful/failed/interrupted + * queries, so this implementation is effectively just a noop. + */ + private static class SqlResourceQueryMetricCounter implements QueryResource.QueryMetricCounter + { + @Override + public void incrementSuccess() + { + + } + + @Override + public void incrementFailed() + { + + } + + @Override + public void incrementInterrupted() + { + + } + + @Override + public void incrementTimedOut() + { + + } + } + + private class SqlResourceResultPusher extends ResultPusher + { + private final String sqlQueryId; + private final HttpStatement stmt; + private final SqlQuery sqlQuery; + + public SqlResourceResultPusher( + AsyncContext asyncContext, + String sqlQueryId, + HttpStatement stmt, + SqlQuery sqlQuery + ) + { + super( + (HttpServletResponse) asyncContext.getResponse(), + SqlResource.this.jsonMapper, + SqlResource.this.responseContextConfig, + SqlResource.this.selfNode, + SqlResource.QUERY_METRIC_COUNTER, + sqlQueryId, + MediaType.APPLICATION_JSON_TYPE + ); + this.sqlQueryId = sqlQueryId; + this.stmt = stmt; + this.sqlQuery = sqlQuery; + } + + @Override + public ResultsWriter start() + { + return new ResultsWriter() + { + private ResultSet thePlan; + + @Override + @Nullable + @SuppressWarnings({"unchecked", "rawtypes"}) + public QueryResponse<Object> start(HttpServletResponse response) + { + response.setHeader(SQL_QUERY_ID_RESPONSE_HEADER, sqlQueryId); + + final QueryResponse<Object[]> retVal; + try { + thePlan = stmt.plan(); + retVal = thePlan.run(); + } + catch (RelOptPlanner.CannotPlanException e) { + recordFailure(e); Review Comment: Instead of recording failure and writing error response, should we just throw a `QueryException` of some sort here and let `ResultPusher` do the rest? ########## server/src/main/java/org/apache/druid/server/QueryResource.java: ########## @@ -565,4 +501,142 @@ public static void transferEntityTag(ResponseContext context, Response.ResponseB builder.header(HEADER_ETAG, entityTag); } } + + private class QueryResourceQueryMetricCounter implements QueryMetricCounter + { + @Override + public void incrementSuccess() + { + successfulQueryCount.incrementAndGet(); + } + + @Override + public void incrementFailed() + { + failedQueryCount.incrementAndGet(); + } + + @Override + public void incrementInterrupted() + { + interruptedQueryCount.incrementAndGet(); + } + + @Override + public void incrementTimedOut() + { + timedOutQueryCount.incrementAndGet(); + } + } + + private class QueryResourceResultPusher extends ResultPusher + { + private final HttpServletRequest req; + private final QueryLifecycle queryLifecycle; + private final ResourceIOReaderWriter io; + + public QueryResourceResultPusher( + HttpServletRequest req, + QueryLifecycle queryLifecycle, + ResourceIOReaderWriter io, + HttpServletResponse response + ) + { + super( + response, + QueryResource.this.jsonMapper, + QueryResource.this.responseContextConfig, + QueryResource.this.selfNode, + QueryResource.this.counter, + queryLifecycle.getQueryId(), + MediaType.valueOf(io.getResponseWriter().getResponseType()) + ); + this.req = req; + this.queryLifecycle = queryLifecycle; + this.io = io; + } + + @Override + public ResultsWriter start() + { + return new ResultsWriter() + { + @Override + public QueryResponse<Object> start(HttpServletResponse response) + { + final QueryResponse<Object> queryResponse = queryLifecycle.execute(); + final ResponseContext responseContext = queryResponse.getResponseContext(); + final String prevEtag = getPreviousEtag(req); + + if (prevEtag != null && prevEtag.equals(responseContext.getEntityTag())) { + queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), -1); + counter.incrementSuccess(); + response.setStatus(HttpServletResponse.SC_NOT_MODIFIED); + return null; + } + + return queryResponse; + } + + @Override + public Writer makeWriter(OutputStream out) throws IOException + { + final ObjectWriter objectWriter = queryLifecycle.newOutputWriter(io); + final SequenceWriter sequenceWriter = objectWriter.writeValuesAsArray(out); + return new Writer() + { + + @Override + public void writeResponseStart() + { + // Do nothing + } + + @Override + public void writeRow(Object obj) throws IOException + { + sequenceWriter.write(obj); + } + + @Override + public void writeResponseEnd() + { + // Do nothing + } + + @Override + public void close() throws IOException + { + sequenceWriter.close(); + } + }; + } + + @Override + public void recordSuccess(long numBytes) Review Comment: `recordSuccess` and `recordFailure` should probably be abstract methods on `ResultPusher` itself rather than `ResultWriter`, as they feel like top-level behaviour. Also in the two impls, they affect fields of the pusher itself. I was also wondering if we could avoid passing the `QueryMetricCounter` to the `ResultPusher`, and just update those counts via `recordSuccess` and `recordFailure` but that might be uglier as we would have to have to handle all possible error codes here. ########## server/src/main/java/org/apache/druid/server/ResultPusher.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.QueryException; +import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.server.security.ForbiddenException; + +import javax.annotation.Nullable; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.MediaType; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; + +public abstract class ResultPusher +{ + private static final Logger log = new Logger(ResultPusher.class); + + private final HttpServletResponse response; + private final String queryId; + private final ObjectMapper jsonMapper; + private final ResponseContextConfig responseContextConfig; + private final DruidNode selfNode; + private final QueryResource.QueryMetricCounter counter; + private final MediaType contentType; + + private StreamingHttpResponseAccumulator accumulator = null; + + public ResultPusher( + HttpServletResponse response, + ObjectMapper jsonMapper, + ResponseContextConfig responseContextConfig, + DruidNode selfNode, + QueryResource.QueryMetricCounter counter, + String queryId, + MediaType contentType + ) + { + this.response = response; + this.queryId = queryId; + this.jsonMapper = jsonMapper; + this.responseContextConfig = responseContextConfig; + this.selfNode = selfNode; + this.counter = counter; + this.contentType = contentType; + } + + /** + * Starts a ResultsWriter, which encapsulates logic to run the query, serialize it and also report success/failure + * <p> + * This response must not be null. The job of this ResultsWriter is largely to provide lifecycle management to + * the query running and reporting, so this object must never be null. Also, this start() method should do as little + * work as possible. + * + * @return + */ + public abstract ResultsWriter start(); + + public abstract void writeException(Exception e, OutputStream out) throws IOException; + + public void push() + { + response.setHeader(QueryResource.QUERY_ID_RESPONSE_HEADER, queryId); + + ResultsWriter resultsWriter = null; + try { + resultsWriter = start(); + + + final QueryResponse<Object> queryResponse = resultsWriter.start(response); + if (queryResponse == null) { + // It's already been handled... + return; + } + + final Sequence<Object> results = queryResponse.getResults(); + + accumulator = new StreamingHttpResponseAccumulator( + ResultPusher.this.response, + queryId, + queryResponse.getResponseContext(), + jsonMapper, + responseContextConfig, + resultsWriter, + selfNode, + contentType + ); + + results.accumulate(null, accumulator); + accumulator.flush(); + + counter.incrementSuccess(); + accumulator.close(); + resultsWriter.recordSuccess(accumulator.getNumBytesSent()); + } + catch (QueryException e) { + handleQueryException(resultsWriter, e); + return; + } + catch (RuntimeException re) { + if (re instanceof ForbiddenException) { + // Forbidden exceptions are special, they get thrown instead of serialized. They happen before the response + // has been committed because the response is committed after results are returned. And, if we started + // returning results before a ForbiddenException gets thrown, that means that we've already leaked stuff + // that should not have been leaked. I.e. it means, we haven't validated the authorization early enough. + if (response.isCommitted()) { + log.error(re, "Got a forbidden exception for query[%s] after the response was already committed.", queryId); + } + throw re; + } + handleQueryException(resultsWriter, new QueryInterruptedException(re)); + return; + } + catch (IOException ioEx) { + handleQueryException(resultsWriter, new QueryInterruptedException(ioEx)); + return; + } + finally { + if (accumulator != null) { + try { + accumulator.close(); + } + catch (IOException e) { + log.warn(e, "Suppressing exception closing accumulator for query[%s]", queryId); + } + } + if (resultsWriter == null) { + log.warn("resultsWriter was null for query[%s], work was maybe done in start() that shouldn't be.", queryId); + } else { + try { + resultsWriter.close(); + } + catch (IOException e) { + log.warn(e, "Suppressing exception closing accumulator for query[%s]", queryId); + } + } + } + } + + private void handleQueryException(ResultsWriter resultsWriter, QueryException e) + { + if (accumulator != null && accumulator.isInitialized()) { + // We already started sending a response when we got the error message. In this case we just give up + // and hope that the partial stream generates a meaningful failure message for our client. We could consider + // also throwing the exception body into the response to make it easier for the client to choke if it manages + // to parse a meaningful object out, but that's potentially an API change so we leave that as an exercise for + // the future. + + resultsWriter.recordFailure(e); + + // This case is always a failure because the error happened mid-stream of sending results back. Therefore, + // we do not believe that the response stream was actually useable + counter.incrementFailed(); + return; + } + + if (response.isCommitted()) { + QueryResource.NO_STACK_LOGGER.warn(e, "Response was committed without the accumulator writing anything!?"); + } + + final QueryException.FailType failType = e.getFailType(); + switch (failType) { + case USER_ERROR: + case UNAUTHORIZED: + case QUERY_RUNTIME_FAILURE: + case SERVER_ERROR: + case CANCELED: + counter.incrementInterrupted(); + break; + case CAPACITY_EXCEEDED: + case UNSUPPORTED: + counter.incrementFailed(); + break; + case TIMEOUT: + counter.incrementTimedOut(); + break; + case UNKNOWN: + log.warn( + e, + "Unknown errorCode[%s], support needs to be added for error handling.", + e.getErrorCode() + ); + counter.incrementFailed(); + } + final int responseStatus = failType.getExpectedStatus(); + + response.setStatus(responseStatus); + response.setHeader("Content-Type", contentType.toString()); + try (ServletOutputStream out = response.getOutputStream()) { + writeException(e, out); + } + catch (IOException ioException) { + log.warn( + ioException, + "Suppressing IOException thrown sending error response for query[%s]", + queryId + ); + } + + resultsWriter.recordFailure(e); + } + + public interface ResultsWriter extends Closeable + { + /** + * Runs the query and returns a ResultsWriter from running the query. Review Comment: ```suggestion * Runs the query and returns the QueryResponse from running the query. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
