This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new df680bab05d Introduced `includeTrailerHeader` to enable 
`TrailerHeaders` in response (#16672)
df680bab05d is described below

commit df680bab05de46e46c5b3ec89a42648685456702
Author: Vivek Dhiman <[email protected]>
AuthorDate: Sat Sep 21 01:59:37 2024 -0700

    Introduced `includeTrailerHeader` to enable `TrailerHeaders` in response 
(#16672)
    
    Introduced includeTrailerHeader to enable TrailerHeaders in response
    If enabled, a header X-Error-Message will be added to indicate reasons for 
partial results.
---
 .../org/apache/druid/server/QueryResource.java     |   2 +
 .../org/apache/druid/server/QueryResultPusher.java |  27 +++-
 .../org/apache/druid/server/QueryResourceTest.java | 159 ++++++++++++++++++++-
 3 files changed, 185 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java 
b/server/src/main/java/org/apache/druid/server/QueryResource.java
index 61696dd5cec..06104000b1c 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResource.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResource.java
@@ -94,6 +94,8 @@ public class QueryResource implements QueryCountStatsProvider
   public static final String HEADER_RESPONSE_CONTEXT = 
"X-Druid-Response-Context";
   public static final String HEADER_IF_NONE_MATCH = "If-None-Match";
   public static final String QUERY_ID_RESPONSE_HEADER = "X-Druid-Query-Id";
+  public static final String ERROR_MESSAGE_TRAILER_HEADER = "X-Error-Message";
+  public static final String RESPONSE_COMPLETE_TRAILER_HEADER = 
"X-Druid-Response-Complete";
   public static final String HEADER_ETAG = "ETag";
 
   protected final QueryLifecycleFactory queryLifecycleFactory;
diff --git 
a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java 
b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
index 074beb545b4..710c8ccc919 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
@@ -38,6 +38,8 @@ import 
org.apache.druid.query.TruncatedResponseContextException;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.ForbiddenException;
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
 
 import javax.annotation.Nullable;
 import javax.servlet.AsyncContext;
@@ -54,6 +56,7 @@ import java.util.Map;
 public abstract class QueryResultPusher
 {
   private static final Logger log = new Logger(QueryResultPusher.class);
+  protected static final String RESULT_TRAILER_HEADERS = 
QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER;
 
   private final HttpServletRequest request;
   private final String queryId;
@@ -63,6 +66,7 @@ public abstract class QueryResultPusher
   private final QueryResource.QueryMetricCounter counter;
   private final MediaType contentType;
   private final Map<String, String> extraHeaders;
+  private final HttpFields trailerFields;
 
   private StreamingHttpResponseAccumulator accumulator;
   private AsyncContext asyncContext;
@@ -87,6 +91,7 @@ public abstract class QueryResultPusher
     this.counter = counter;
     this.contentType = contentType;
     this.extraHeaders = extraHeaders;
+    this.trailerFields = new HttpFields();
   }
 
   /**
@@ -120,7 +125,9 @@ public abstract class QueryResultPusher
 
       final Response.ResponseBuilder startResponse = resultsWriter.start();
       if (startResponse != null) {
-        startResponse.header(QueryResource.QUERY_ID_RESPONSE_HEADER, queryId);
+        startResponse.header(QueryResource.QUERY_ID_RESPONSE_HEADER, queryId)
+                     .header(HttpHeader.TRAILER.toString(), 
RESULT_TRAILER_HEADERS);
+
         for (Map.Entry<String, String> entry : extraHeaders.entrySet()) {
           startResponse.header(entry.getKey(), entry.getValue());
         }
@@ -143,6 +150,17 @@ public abstract class QueryResultPusher
         response.setHeader(entry.getKey(), entry.getValue());
       }
 
+      if (response instanceof org.eclipse.jetty.server.Response) {
+        org.eclipse.jetty.server.Response jettyResponse = 
(org.eclipse.jetty.server.Response) response;
+
+        jettyResponse.setHeader(HttpHeader.TRAILER.toString(), 
RESULT_TRAILER_HEADERS);
+        jettyResponse.setTrailers(() -> trailerFields);
+
+        // Start with complete status
+
+        trailerFields.put(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER, 
"true");
+      }
+
       accumulator = new 
StreamingHttpResponseAccumulator(queryResponse.getResponseContext(), 
resultsWriter);
 
       results.accumulate(null, accumulator);
@@ -223,6 +241,8 @@ public abstract class QueryResultPusher
         // also throwing the exception body into the response to make it 
easier for the client to choke if it manages
         // to parse a meaningful object out, but that's potentially an API 
change so we leave that as an exercise for
         // the future.
+        trailerFields.put(QueryResource.ERROR_MESSAGE_TRAILER_HEADER, 
e.getMessage());
+        trailerFields.put(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER, 
"false");
         return null;
       }
     }
@@ -418,6 +438,11 @@ public abstract class QueryResultPusher
         response.setHeader(QueryResource.HEADER_RESPONSE_CONTEXT, 
serializationResult.getResult());
         response.setContentType(contentType.toString());
 
+        if (response instanceof org.eclipse.jetty.server.Response) {
+          org.eclipse.jetty.server.Response jettyResponse = 
(org.eclipse.jetty.server.Response) response;
+          jettyResponse.setTrailers(() -> trailerFields);
+        }
+
         try {
           out = new CountingOutputStream(response.getOutputStream());
           writer = resultsWriter.makeWriter(out);
diff --git 
a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java 
b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index 32c26edffee..4d827a008f3 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -39,8 +39,13 @@ import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.Accumulator;
 import org.apache.druid.java.util.common.guava.LazySequence;
+import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.java.util.common.guava.YieldingAccumulator;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.BadJsonQueryException;
@@ -65,6 +70,7 @@ import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.log.TestRequestLogger;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.mocks.ExceptionalInputStream;
+import org.apache.druid.server.mocks.MockAsyncContext;
 import org.apache.druid.server.mocks.MockHttpServletRequest;
 import org.apache.druid.server.mocks.MockHttpServletResponse;
 import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy;
@@ -81,6 +87,11 @@ import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.server.security.Resource;
 import org.apache.http.HttpStatus;
+import org.easymock.EasyMock;
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.server.HttpChannel;
+import org.eclipse.jetty.server.HttpOutput;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.MatcherAssert;
 import org.joda.time.Interval;
@@ -258,8 +269,8 @@ public class QueryResourceTest
   @Test
   public void testGoodQueryWithQueryConfigOverrideDefault() throws IOException
   {
-    String overrideConfigKey = "priority";
-    String overrideConfigValue = "678";
+    final String overrideConfigKey = "priority";
+    final String overrideConfigValue = "678";
     DefaultQueryConfig overrideConfig = new 
DefaultQueryConfig(ImmutableMap.of(overrideConfigKey, overrideConfigValue));
     queryResource = new QueryResource(
         new QueryLifecycleFactory(
@@ -381,6 +392,125 @@ public class QueryResourceTest
     );
   }
 
+  @Test
+  public void testResponseWithIncludeTrailerHeader() throws IOException
+  {
+    queryResource = new QueryResource(
+      new QueryLifecycleFactory(
+        WAREHOUSE,
+        new QuerySegmentWalker()
+        {
+          @Override
+          public <T> QueryRunner<T> getQueryRunnerForIntervals(
+              Query<T> query,
+              Iterable<Interval> intervals
+          )
+          {
+            return (queryPlus, responseContext) -> new Sequence<T>() {
+              @Override
+              public <OutType> OutType accumulate(OutType initValue, 
Accumulator<OutType, T> accumulator)
+              {
+                if (accumulator instanceof 
QueryResultPusher.StreamingHttpResponseAccumulator) {
+                  try {
+                    ((QueryResultPusher.StreamingHttpResponseAccumulator) 
accumulator).flush(); // initialized
+                  }
+                  catch (IOException ignore) {
+                  }
+                }
+
+                throw new QueryTimeoutException();
+              }
+
+              @Override
+              public <OutType> Yielder<OutType> toYielder(OutType initValue, 
YieldingAccumulator<OutType, T> accumulator)
+              {
+                return Yielders.done(initValue, null);
+              }
+            };
+          }
+
+          @Override
+          public <T> QueryRunner<T> getQueryRunnerForSegments(
+              Query<T> query,
+              Iterable<SegmentDescriptor> specs
+          )
+          {
+            throw new UnsupportedOperationException();
+          }
+        },
+        new DefaultGenericQueryMetricsFactory(),
+        new NoopServiceEmitter(),
+        testRequestLogger,
+        new AuthConfig(),
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+        Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
+      ),
+      jsonMapper,
+      smileMapper,
+      queryScheduler,
+      new AuthConfig(),
+      null,
+      ResponseContextConfig.newConfig(true),
+      DRUID_NODE
+    );
+
+    expectPermissiveHappyPathAuth();
+
+    org.eclipse.jetty.server.Response response = 
this.jettyResponseforRequest(testServletRequest);
+    Assert.assertNull(queryResource.doPost(new ByteArrayInputStream(
+        SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)),
+        null /*pretty*/,
+        testServletRequest));
+    Assert.assertTrue(response.containsHeader(HttpHeader.TRAILER.toString()));
+    Assert.assertEquals(response.getHeader(HttpHeader.TRAILER.toString()), 
QueryResultPusher.RESULT_TRAILER_HEADERS);
+
+    final HttpFields fields = response.getTrailers().get();
+    
Assert.assertTrue(fields.containsKey(QueryResource.ERROR_MESSAGE_TRAILER_HEADER));
+    Assert.assertEquals(fields.get(QueryResource.ERROR_MESSAGE_TRAILER_HEADER),
+        "Query did not complete within configured timeout period. You can 
increase query timeout or tune the performance of query.");
+
+    
Assert.assertTrue(fields.containsKey(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER));
+    
Assert.assertEquals(fields.get(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER), 
"false");
+  }
+
+  @Test
+  public void testSuccessResponseWithTrailerHeader() throws IOException
+  {
+    queryResource = new QueryResource(
+        new QueryLifecycleFactory(
+            WAREHOUSE,
+            TEST_SEGMENT_WALKER,
+            new DefaultGenericQueryMetricsFactory(),
+            new NoopServiceEmitter(),
+            testRequestLogger,
+            new AuthConfig(),
+            AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+            Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
+        ),
+        jsonMapper,
+        smileMapper,
+        queryScheduler,
+        new AuthConfig(),
+        null,
+        ResponseContextConfig.newConfig(true),
+        DRUID_NODE
+    );
+
+    expectPermissiveHappyPathAuth();
+
+    org.eclipse.jetty.server.Response response = 
this.jettyResponseforRequest(testServletRequest);
+    Assert.assertNull(queryResource.doPost(new ByteArrayInputStream(
+                                               
SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)),
+                                           null /*pretty*/,
+                                           testServletRequest));
+    Assert.assertTrue(response.containsHeader(HttpHeader.TRAILER.toString()));
+
+    final HttpFields fields = response.getTrailers().get();
+    
Assert.assertFalse(fields.containsKey(QueryResource.ERROR_MESSAGE_TRAILER_HEADER));
+
+    
Assert.assertTrue(fields.containsKey(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER));
+    
Assert.assertEquals(fields.get(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER), 
"true");
+  }
 
   @Test
   public void testQueryThrowsRuntimeExceptionFromLifecycleExecute() throws 
IOException
@@ -1459,4 +1589,29 @@ public class QueryResourceTest
   {
     return queryResource.doPost(new ByteArrayInputStream(bytes), null, req);
   }
+
+  private org.eclipse.jetty.server.Response 
jettyResponseforRequest(MockHttpServletRequest req) throws IOException
+  {
+    HttpChannel channelMock = EasyMock.mock(HttpChannel.class);
+    HttpOutput outputMock = EasyMock.mock(HttpOutput.class);
+    org.eclipse.jetty.server.Response response = new 
org.eclipse.jetty.server.Response(channelMock, outputMock);
+
+    EasyMock.expect(channelMock.isSendError()).andReturn(false);
+    EasyMock.expect(channelMock.isCommitted()).andReturn(true);
+
+    outputMock.close();
+    EasyMock.expectLastCall().andVoid();
+
+    outputMock.write(EasyMock.anyObject(byte[].class), EasyMock.anyInt(), 
EasyMock.anyInt());
+    EasyMock.expectLastCall().andVoid();
+
+    EasyMock.replay(outputMock, channelMock);
+
+    req.newAsyncContext(() -> {
+      final MockAsyncContext retVal = new MockAsyncContext();
+      retVal.response = response;
+      return retVal;
+    });
+    return response;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to