This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2c3fa6b4c22 Add an option to write a mid-flight exception as a
response row (#18571)
2c3fa6b4c22 is described below
commit 2c3fa6b4c22d2d493bc3f8192431cfe897897784
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Thu Sep 25 19:33:17 2025 +0530
Add an option to write a mid-flight exception as a response row (#18571)
* Add an option to write exception as a row
* remove unused import
---
.../org/apache/druid/server/QueryResource.java | 1 +
.../QueryResourceQueryResultPusherFactory.java | 3 +-
.../org/apache/druid/server/QueryResultPusher.java | 29 +++++--
.../org/apache/druid/server/QueryResourceTest.java | 99 ++++++++++++++++++++++
.../apache/druid/server/QueryResultPusherTest.java | 4 +-
.../sql/http/SqlResourceQueryResultPusher.java | 3 +-
6 files changed, 130 insertions(+), 9 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java
b/server/src/main/java/org/apache/druid/server/QueryResource.java
index 05939c83a84..80ea2895b97 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResource.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResource.java
@@ -84,6 +84,7 @@ public class QueryResource implements QueryCountStatsProvider
public static final String ERROR_MESSAGE_TRAILER_HEADER = "X-Error-Message";
public static final String RESPONSE_COMPLETE_TRAILER_HEADER =
"X-Druid-Response-Complete";
public static final String HEADER_ETAG = "ETag";
+ public static final String WRITE_EXCEPTION_BODY_AS_RESPONSE_ROW =
"writeExceptionBodyAsResponseRow";
protected final QueryLifecycleFactory queryLifecycleFactory;
protected final ObjectMapper jsonMapper;
diff --git
a/server/src/main/java/org/apache/druid/server/QueryResourceQueryResultPusherFactory.java
b/server/src/main/java/org/apache/druid/server/QueryResourceQueryResultPusherFactory.java
index 2eeb986ce49..5e403fc6201 100644
---
a/server/src/main/java/org/apache/druid/server/QueryResourceQueryResultPusherFactory.java
+++
b/server/src/main/java/org/apache/druid/server/QueryResourceQueryResultPusherFactory.java
@@ -105,7 +105,8 @@ public class QueryResourceQueryResultPusherFactory
counter,
queryLifecycle.getQueryId(),
MediaType.valueOf(io.getResponseWriter().getResponseType()),
- ImmutableMap.of()
+ ImmutableMap.of(),
+ queryLifecycle.getQuery().getContext()
);
this.req = req;
this.queryLifecycle = queryLifecycle;
diff --git
a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
index 2ee9c0d5d96..f85bc3dee52 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
@@ -67,6 +67,7 @@ public abstract class QueryResultPusher
private final QueryResource.QueryMetricCounter counter;
private final MediaType contentType;
private final Map<String, String> extraHeaders;
+ private final Map<String, Object> queryContext;
private final Map<String, String> trailerFields;
private StreamingHttpResponseAccumulator accumulator;
@@ -81,7 +82,8 @@ public abstract class QueryResultPusher
QueryResource.QueryMetricCounter counter,
String queryId,
MediaType contentType,
- Map<String, String> extraHeaders
+ Map<String, String> extraHeaders,
+ Map<String, Object> queryContext
)
{
this.request = request;
@@ -92,6 +94,7 @@ public abstract class QueryResultPusher
this.counter = counter;
this.contentType = contentType;
this.extraHeaders = extraHeaders;
+ this.queryContext = queryContext;
this.trailerFields = new HashMap<>();
}
@@ -254,11 +257,25 @@ public abstract class QueryResultPusher
resultsWriter.recordFailure(e);
if (accumulator != null && accumulator.isInitialized()) {
- // We already started sending a response when we got the error
message. In this case we just give up
- // and hope that the partial stream generates a meaningful failure
message for our client. We could consider
- // also throwing the exception body into the response to make it
easier for the client to choke if it manages
- // to parse a meaningful object out, but that's potentially an API
change so we leave that as an exercise for
- // the future.
+ // We already started sending a response when we got the error
message. In this case we write the exception
+ // message as a row, assuming the caller (SqlResource, QueryResource
or a custom endpoint) would be able to
+ // parse it and throw an exception on their side. It's assumed that if
the caller is setting the
+ // WRITE_EXCEPTION_BODY_AS_RESPONSE_ROW context value, they are able
to handle this kind of response. If it's
+ // not set, caller will continue to see a json parsing exception.
+ if (queryContext != null
+ &&
Boolean.parseBoolean(String.valueOf(queryContext.get(QueryResource.WRITE_EXCEPTION_BODY_AS_RESPONSE_ROW))))
{
+ try {
+ accumulator.writer.writeRow(e);
+ accumulator.writer.writeResponseEnd();
+ }
+ catch (IOException ioException) {
+ log.warn(
+ ioException,
+ "Suppressing IOException thrown writing error response for
query [%s]",
+ queryId
+ );
+ }
+ }
trailerFields.put(QueryResource.ERROR_MESSAGE_TRAILER_HEADER,
e.getMessage());
trailerFields.put(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER,
"false");
return null;
diff --git
a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index 684c2c15734..16fcf562ccd 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -35,9 +35,11 @@ import com.google.inject.Key;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.error.ErrorResponse;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Accumulator;
@@ -109,6 +111,7 @@ import javax.ws.rs.core.Response.Status;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -197,6 +200,22 @@ public class QueryResourceTest
+ " \"context\": { \"priority\": -1 }"
+ "}";
+ private static final String SIMPLE_TIMESERIES_QUERY_WRITE_EXCEPTION_AS_ROW =
+ "{\n"
+ + " \"queryType\": \"timeseries\",\n"
+ + " \"dataSource\": \"mmx_metrics\",\n"
+ + " \"granularity\": \"hour\",\n"
+ + " \"intervals\": [\n"
+ + " \"2014-12-17/2015-12-30\"\n"
+ + " ],\n"
+ + " \"aggregations\": [\n"
+ + " {\n"
+ + " \"type\": \"count\",\n"
+ + " \"name\": \"rows\"\n"
+ + " }\n"
+ + " ],\n"
+ + " \"context\": { \"writeExceptionBodyAsResponseRow\": \"true\" }"
+ + "}";
private static final ServiceEmitter NOOP_SERVICE_EMITTER = new
NoopServiceEmitter();
private static final DruidNode DRUID_NODE = new DruidNode(
@@ -507,6 +526,86 @@ public class QueryResourceTest
Assert.assertEquals(fields.get(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER),
"false");
}
+ @Test
+ public void testResponseWithMidFlightExceptions() throws IOException
+ {
+ queryResource = new QueryResource(
+ new QueryLifecycleFactory(
+ CONGLOMERATE,
+ new QuerySegmentWalker()
+ {
+ @Override
+ public <T> QueryRunner<T> getQueryRunnerForIntervals(
+ Query<T> query,
+ Iterable<Interval> intervals
+ )
+ {
+ return (queryPlus, responseContext) -> new Sequence<T>()
+ {
+ @Override
+ public <OutType> OutType accumulate(OutType initValue,
Accumulator<OutType, T> accumulator)
+ {
+ accumulator.accumulate(null,
+ (T) new
TimeBoundaryResultValue(ImmutableMap.<String, Object>of("maxTime",
DateTimes.of("2014-08-02")))
+ );
+ throw InvalidInput.exception("mid-flight exception");
+ }
+
+ @Override
+ public <OutType> Yielder<OutType> toYielder(
+ OutType initValue,
+ YieldingAccumulator<OutType, T> accumulator
+ )
+ {
+ throw new UnsupportedOperationException("not implemented");
+ }
+ };
+ }
+
+ @Override
+ public <T> QueryRunner<T> getQueryRunnerForSegments(
+ Query<T> query,
+ Iterable<SegmentDescriptor> specs
+ )
+ {
+ throw new UnsupportedOperationException();
+ }
+ },
+ new DefaultGenericQueryMetricsFactory(),
+ new NoopServiceEmitter(),
+ testRequestLogger,
+ new AuthConfig(),
+ NoopPolicyEnforcer.instance(),
+ AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+ Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
+ ),
+ jsonMapper,
+ queryScheduler,
+ null,
+ new QueryResourceQueryResultPusherFactory(
+ jsonMapper,
+ ResponseContextConfig.newConfig(true),
+ DRUID_NODE
+ ),
+ new ResourceIOReaderWriterFactory(jsonMapper, smileMapper)
+ );
+
+ expectPermissiveHappyPathAuth();
+
+ MockHttpServletResponse response =
expectAsyncRequestFlow(testServletRequest,
+
SIMPLE_TIMESERIES_QUERY_WRITE_EXCEPTION_AS_ROW.getBytes(
+
StandardCharsets.UTF_8),
+ queryResource
+ );
+ String actualOutput = response.baos.toString(Charset.defaultCharset());
+ Assert.assertEquals(
+
"[{\"maxTime\":\"2014-08-02T00:00:00.000Z\"},{\"error\":\"druidException\","
+ +
"\"errorCode\":\"invalidInput\",\"persona\":\"USER\",\"category\":\"INVALID_INPUT\","
+ + "\"errorMessage\":\"mid-flight exception\",\"context\":{}}]",
+ actualOutput
+ );
+ }
+
@Test
public void
testResponseContextContainsMissingSegments_whenLastSegmentIsMissing() throws
IOException
{
diff --git
a/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java
b/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java
index 00836639202..65089c2f231 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java
@@ -33,6 +33,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.ResponseBuilder;
import java.io.OutputStream;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -114,7 +115,8 @@ public class QueryResultPusherTest
counter,
queryId,
contentType,
- extraHeaders)
+ extraHeaders,
+ Collections.emptyMap())
{
@Override
diff --git
a/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusher.java
b/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusher.java
index 1cdbbf3a723..89620f88516 100644
---
a/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusher.java
+++
b/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusher.java
@@ -70,7 +70,8 @@ class SqlResourceQueryResultPusher extends QueryResultPusher
SqlResource.QUERY_METRIC_COUNTER,
stmt.sqlQueryId(),
MediaType.APPLICATION_JSON_TYPE,
- headers
+ headers,
+ sqlQuery.getContext()
);
this.serverConfig = serverConfig;
this.jsonMapper = jsonMapper;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]