This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c3fa6b4c22 Add an option to write a mid-flight exception as a 
response row (#18571)
2c3fa6b4c22 is described below

commit 2c3fa6b4c22d2d493bc3f8192431cfe897897784
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Thu Sep 25 19:33:17 2025 +0530

    Add an option to write a mid-flight exception as a response row (#18571)
    
    * Add an option to write exception as a row
    
    * remove unused import
---
 .../org/apache/druid/server/QueryResource.java     |  1 +
 .../QueryResourceQueryResultPusherFactory.java     |  3 +-
 .../org/apache/druid/server/QueryResultPusher.java | 29 +++++--
 .../org/apache/druid/server/QueryResourceTest.java | 99 ++++++++++++++++++++++
 .../apache/druid/server/QueryResultPusherTest.java |  4 +-
 .../sql/http/SqlResourceQueryResultPusher.java     |  3 +-
 6 files changed, 130 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java 
b/server/src/main/java/org/apache/druid/server/QueryResource.java
index 05939c83a84..80ea2895b97 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResource.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResource.java
@@ -84,6 +84,7 @@ public class QueryResource implements QueryCountStatsProvider
   public static final String ERROR_MESSAGE_TRAILER_HEADER = "X-Error-Message";
   public static final String RESPONSE_COMPLETE_TRAILER_HEADER = 
"X-Druid-Response-Complete";
   public static final String HEADER_ETAG = "ETag";
+  public static final String WRITE_EXCEPTION_BODY_AS_RESPONSE_ROW = 
"writeExceptionBodyAsResponseRow";
 
   protected final QueryLifecycleFactory queryLifecycleFactory;
   protected final ObjectMapper jsonMapper;
diff --git 
a/server/src/main/java/org/apache/druid/server/QueryResourceQueryResultPusherFactory.java
 
b/server/src/main/java/org/apache/druid/server/QueryResourceQueryResultPusherFactory.java
index 2eeb986ce49..5e403fc6201 100644
--- 
a/server/src/main/java/org/apache/druid/server/QueryResourceQueryResultPusherFactory.java
+++ 
b/server/src/main/java/org/apache/druid/server/QueryResourceQueryResultPusherFactory.java
@@ -105,7 +105,8 @@ public class QueryResourceQueryResultPusherFactory
           counter,
           queryLifecycle.getQueryId(),
           MediaType.valueOf(io.getResponseWriter().getResponseType()),
-          ImmutableMap.of()
+          ImmutableMap.of(),
+          queryLifecycle.getQuery().getContext()
       );
       this.req = req;
       this.queryLifecycle = queryLifecycle;
diff --git 
a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java 
b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
index 2ee9c0d5d96..f85bc3dee52 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
@@ -67,6 +67,7 @@ public abstract class QueryResultPusher
   private final QueryResource.QueryMetricCounter counter;
   private final MediaType contentType;
   private final Map<String, String> extraHeaders;
+  private final Map<String, Object> queryContext;
   private final Map<String, String> trailerFields;
 
   private StreamingHttpResponseAccumulator accumulator;
@@ -81,7 +82,8 @@ public abstract class QueryResultPusher
       QueryResource.QueryMetricCounter counter,
       String queryId,
       MediaType contentType,
-      Map<String, String> extraHeaders
+      Map<String, String> extraHeaders,
+      Map<String, Object> queryContext
   )
   {
     this.request = request;
@@ -92,6 +94,7 @@ public abstract class QueryResultPusher
     this.counter = counter;
     this.contentType = contentType;
     this.extraHeaders = extraHeaders;
+    this.queryContext = queryContext;
     this.trailerFields = new HashMap<>();
   }
 
@@ -254,11 +257,25 @@ public abstract class QueryResultPusher
       resultsWriter.recordFailure(e);
 
       if (accumulator != null && accumulator.isInitialized()) {
-        // We already started sending a response when we got the error 
message.  In this case we just give up
-        // and hope that the partial stream generates a meaningful failure 
message for our client.  We could consider
-        // also throwing the exception body into the response to make it 
easier for the client to choke if it manages
-        // to parse a meaningful object out, but that's potentially an API 
change so we leave that as an exercise for
-        // the future.
+        // We already started sending a response when we got the error 
message.  In this case we write the exception
+        // message as a row, assuming the caller (SqlResource, QueryResource 
or a custom endpoint) would be able to
+        // parse it and throw an exception on their side. It's assumed that if 
the caller is setting the
+        // WRITE_EXCEPTION_BODY_AS_RESPONSE_ROW context value, they are able 
to handle this kind of response. If it's
+        // not set, caller will continue to see a json parsing exception.
+        if (queryContext != null
+            && 
Boolean.parseBoolean(String.valueOf(queryContext.get(QueryResource.WRITE_EXCEPTION_BODY_AS_RESPONSE_ROW))))
 {
+          try {
+            accumulator.writer.writeRow(e);
+            accumulator.writer.writeResponseEnd();
+          }
+          catch (IOException ioException) {
+            log.warn(
+                ioException,
+                "Suppressing IOException thrown writing error response for 
query [%s]",
+                queryId
+            );
+          }
+        }
         trailerFields.put(QueryResource.ERROR_MESSAGE_TRAILER_HEADER, 
e.getMessage());
         trailerFields.put(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER, 
"false");
         return null;
diff --git 
a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java 
b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index 684c2c15734..16fcf562ccd 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -35,9 +35,11 @@ import com.google.inject.Key;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.error.ErrorResponse;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.Accumulator;
@@ -109,6 +111,7 @@ import javax.ws.rs.core.Response.Status;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -197,6 +200,22 @@ public class QueryResourceTest
       + "    \"context\": { \"priority\": -1 }"
       + "}";
 
+  private static final String SIMPLE_TIMESERIES_QUERY_WRITE_EXCEPTION_AS_ROW =
+      "{\n"
+      + "    \"queryType\": \"timeseries\",\n"
+      + "    \"dataSource\": \"mmx_metrics\",\n"
+      + "    \"granularity\": \"hour\",\n"
+      + "    \"intervals\": [\n"
+      + "      \"2014-12-17/2015-12-30\"\n"
+      + "    ],\n"
+      + "    \"aggregations\": [\n"
+      + "      {\n"
+      + "        \"type\": \"count\",\n"
+      + "        \"name\": \"rows\"\n"
+      + "      }\n"
+      + "    ],\n"
+      + "    \"context\": { \"writeExceptionBodyAsResponseRow\": \"true\" }"
+      + "}";
 
   private static final ServiceEmitter NOOP_SERVICE_EMITTER = new 
NoopServiceEmitter();
   private static final DruidNode DRUID_NODE = new DruidNode(
@@ -507,6 +526,86 @@ public class QueryResourceTest
     
Assert.assertEquals(fields.get(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER), 
"false");
   }
 
+  @Test
+  public void testResponseWithMidFlightExceptions() throws IOException
+  {
+    queryResource = new QueryResource(
+        new QueryLifecycleFactory(
+            CONGLOMERATE,
+            new QuerySegmentWalker()
+            {
+              @Override
+              public <T> QueryRunner<T> getQueryRunnerForIntervals(
+                  Query<T> query,
+                  Iterable<Interval> intervals
+              )
+              {
+                return (queryPlus, responseContext) -> new Sequence<T>()
+                {
+                  @Override
+                  public <OutType> OutType accumulate(OutType initValue, 
Accumulator<OutType, T> accumulator)
+                  {
+                    accumulator.accumulate(null,
+                                           (T) new 
TimeBoundaryResultValue(ImmutableMap.<String, Object>of("maxTime", 
DateTimes.of("2014-08-02")))
+                    );
+                    throw InvalidInput.exception("mid-flight exception");
+                  }
+
+                  @Override
+                  public <OutType> Yielder<OutType> toYielder(
+                      OutType initValue,
+                      YieldingAccumulator<OutType, T> accumulator
+                  )
+                  {
+                    throw new UnsupportedOperationException("not implemented");
+                  }
+                };
+              }
+
+              @Override
+              public <T> QueryRunner<T> getQueryRunnerForSegments(
+                  Query<T> query,
+                  Iterable<SegmentDescriptor> specs
+              )
+              {
+                throw new UnsupportedOperationException();
+              }
+            },
+            new DefaultGenericQueryMetricsFactory(),
+            new NoopServiceEmitter(),
+            testRequestLogger,
+            new AuthConfig(),
+            NoopPolicyEnforcer.instance(),
+            AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+            Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
+        ),
+        jsonMapper,
+        queryScheduler,
+        null,
+        new QueryResourceQueryResultPusherFactory(
+            jsonMapper,
+            ResponseContextConfig.newConfig(true),
+            DRUID_NODE
+        ),
+        new ResourceIOReaderWriterFactory(jsonMapper, smileMapper)
+    );
+
+    expectPermissiveHappyPathAuth();
+
+    MockHttpServletResponse response = 
expectAsyncRequestFlow(testServletRequest,
+                                                              
SIMPLE_TIMESERIES_QUERY_WRITE_EXCEPTION_AS_ROW.getBytes(
+                                                                  
StandardCharsets.UTF_8),
+                                                              queryResource
+    );
+    String actualOutput = response.baos.toString(Charset.defaultCharset());
+    Assert.assertEquals(
+        
"[{\"maxTime\":\"2014-08-02T00:00:00.000Z\"},{\"error\":\"druidException\","
+        + 
"\"errorCode\":\"invalidInput\",\"persona\":\"USER\",\"category\":\"INVALID_INPUT\","
+        + "\"errorMessage\":\"mid-flight exception\",\"context\":{}}]",
+        actualOutput
+    );
+  }
+
   @Test
   public void 
testResponseContextContainsMissingSegments_whenLastSegmentIsMissing() throws 
IOException
   {
diff --git 
a/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java 
b/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java
index 00836639202..65089c2f231 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java
@@ -33,6 +33,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response.ResponseBuilder;
 
 import java.io.OutputStream;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -114,7 +115,8 @@ public class QueryResultPusherTest
         counter,
         queryId,
         contentType,
-        extraHeaders)
+        extraHeaders,
+        Collections.emptyMap())
     {
 
       @Override
diff --git 
a/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusher.java 
b/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusher.java
index 1cdbbf3a723..89620f88516 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusher.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusher.java
@@ -70,7 +70,8 @@ class SqlResourceQueryResultPusher extends QueryResultPusher
         SqlResource.QUERY_METRIC_COUNTER,
         stmt.sqlQueryId(),
         MediaType.APPLICATION_JSON_TYPE,
-        headers
+        headers,
+        sqlQuery.getContext()
     );
     this.serverConfig = serverConfig;
     this.jsonMapper = jsonMapper;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to