This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new df680bab05d Introduced `includeTrailerHeader` to enable
`TrailerHeaders` in response (#16672)
df680bab05d is described below
commit df680bab05de46e46c5b3ec89a42648685456702
Author: Vivek Dhiman <[email protected]>
AuthorDate: Sat Sep 21 01:59:37 2024 -0700
Introduced `includeTrailerHeader` to enable `TrailerHeaders` in response
(#16672)
Introduced includeTrailerHeader to enable TrailerHeaders in response
If enabled, a header X-Error-Message will be added to indicate reasons for
partial results.
---
.../org/apache/druid/server/QueryResource.java | 2 +
.../org/apache/druid/server/QueryResultPusher.java | 27 +++-
.../org/apache/druid/server/QueryResourceTest.java | 159 ++++++++++++++++++++-
3 files changed, 185 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java
b/server/src/main/java/org/apache/druid/server/QueryResource.java
index 61696dd5cec..06104000b1c 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResource.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResource.java
@@ -94,6 +94,8 @@ public class QueryResource implements QueryCountStatsProvider
public static final String HEADER_RESPONSE_CONTEXT =
"X-Druid-Response-Context";
public static final String HEADER_IF_NONE_MATCH = "If-None-Match";
public static final String QUERY_ID_RESPONSE_HEADER = "X-Druid-Query-Id";
+ public static final String ERROR_MESSAGE_TRAILER_HEADER = "X-Error-Message";
+ public static final String RESPONSE_COMPLETE_TRAILER_HEADER =
"X-Druid-Response-Complete";
public static final String HEADER_ETAG = "ETag";
protected final QueryLifecycleFactory queryLifecycleFactory;
diff --git
a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
index 074beb545b4..710c8ccc919 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
@@ -38,6 +38,8 @@ import
org.apache.druid.query.TruncatedResponseContextException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.ForbiddenException;
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
@@ -54,6 +56,7 @@ import java.util.Map;
public abstract class QueryResultPusher
{
private static final Logger log = new Logger(QueryResultPusher.class);
+ protected static final String RESULT_TRAILER_HEADERS =
QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER;
private final HttpServletRequest request;
private final String queryId;
@@ -63,6 +66,7 @@ public abstract class QueryResultPusher
private final QueryResource.QueryMetricCounter counter;
private final MediaType contentType;
private final Map<String, String> extraHeaders;
+ private final HttpFields trailerFields;
private StreamingHttpResponseAccumulator accumulator;
private AsyncContext asyncContext;
@@ -87,6 +91,7 @@ public abstract class QueryResultPusher
this.counter = counter;
this.contentType = contentType;
this.extraHeaders = extraHeaders;
+ this.trailerFields = new HttpFields();
}
/**
@@ -120,7 +125,9 @@ public abstract class QueryResultPusher
final Response.ResponseBuilder startResponse = resultsWriter.start();
if (startResponse != null) {
- startResponse.header(QueryResource.QUERY_ID_RESPONSE_HEADER, queryId);
+ startResponse.header(QueryResource.QUERY_ID_RESPONSE_HEADER, queryId)
+ .header(HttpHeader.TRAILER.toString(),
RESULT_TRAILER_HEADERS);
+
for (Map.Entry<String, String> entry : extraHeaders.entrySet()) {
startResponse.header(entry.getKey(), entry.getValue());
}
@@ -143,6 +150,17 @@ public abstract class QueryResultPusher
response.setHeader(entry.getKey(), entry.getValue());
}
+ if (response instanceof org.eclipse.jetty.server.Response) {
+ org.eclipse.jetty.server.Response jettyResponse =
(org.eclipse.jetty.server.Response) response;
+
+ jettyResponse.setHeader(HttpHeader.TRAILER.toString(),
RESULT_TRAILER_HEADERS);
+ jettyResponse.setTrailers(() -> trailerFields);
+
+ // Start with complete status
+
+ trailerFields.put(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER,
"true");
+ }
+
accumulator = new
StreamingHttpResponseAccumulator(queryResponse.getResponseContext(),
resultsWriter);
results.accumulate(null, accumulator);
@@ -223,6 +241,8 @@ public abstract class QueryResultPusher
// also throwing the exception body into the response to make it
easier for the client to choke if it manages
// to parse a meaningful object out, but that's potentially an API
change so we leave that as an exercise for
// the future.
+ trailerFields.put(QueryResource.ERROR_MESSAGE_TRAILER_HEADER,
e.getMessage());
+ trailerFields.put(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER,
"false");
return null;
}
}
@@ -418,6 +438,11 @@ public abstract class QueryResultPusher
response.setHeader(QueryResource.HEADER_RESPONSE_CONTEXT,
serializationResult.getResult());
response.setContentType(contentType.toString());
+ if (response instanceof org.eclipse.jetty.server.Response) {
+ org.eclipse.jetty.server.Response jettyResponse =
(org.eclipse.jetty.server.Response) response;
+ jettyResponse.setTrailers(() -> trailerFields);
+ }
+
try {
out = new CountingOutputStream(response.getOutputStream());
writer = resultsWriter.makeWriter(out);
diff --git
a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index 32c26edffee..4d827a008f3 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -39,8 +39,13 @@ import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.LazySequence;
+import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BadJsonQueryException;
@@ -65,6 +70,7 @@ import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.log.TestRequestLogger;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.mocks.ExceptionalInputStream;
+import org.apache.druid.server.mocks.MockAsyncContext;
import org.apache.druid.server.mocks.MockHttpServletRequest;
import org.apache.druid.server.mocks.MockHttpServletResponse;
import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy;
@@ -81,6 +87,11 @@ import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.Resource;
import org.apache.http.HttpStatus;
+import org.easymock.EasyMock;
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.server.HttpChannel;
+import org.eclipse.jetty.server.HttpOutput;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
@@ -258,8 +269,8 @@ public class QueryResourceTest
@Test
public void testGoodQueryWithQueryConfigOverrideDefault() throws IOException
{
- String overrideConfigKey = "priority";
- String overrideConfigValue = "678";
+ final String overrideConfigKey = "priority";
+ final String overrideConfigValue = "678";
DefaultQueryConfig overrideConfig = new
DefaultQueryConfig(ImmutableMap.of(overrideConfigKey, overrideConfigValue));
queryResource = new QueryResource(
new QueryLifecycleFactory(
@@ -381,6 +392,125 @@ public class QueryResourceTest
);
}
+ @Test
+ public void testResponseWithIncludeTrailerHeader() throws IOException
+ {
+ queryResource = new QueryResource(
+ new QueryLifecycleFactory(
+ WAREHOUSE,
+ new QuerySegmentWalker()
+ {
+ @Override
+ public <T> QueryRunner<T> getQueryRunnerForIntervals(
+ Query<T> query,
+ Iterable<Interval> intervals
+ )
+ {
+ return (queryPlus, responseContext) -> new Sequence<T>() {
+ @Override
+ public <OutType> OutType accumulate(OutType initValue,
Accumulator<OutType, T> accumulator)
+ {
+ if (accumulator instanceof
QueryResultPusher.StreamingHttpResponseAccumulator) {
+ try {
+ ((QueryResultPusher.StreamingHttpResponseAccumulator)
accumulator).flush(); // initialized
+ }
+ catch (IOException ignore) {
+ }
+ }
+
+ throw new QueryTimeoutException();
+ }
+
+ @Override
+ public <OutType> Yielder<OutType> toYielder(OutType initValue,
YieldingAccumulator<OutType, T> accumulator)
+ {
+ return Yielders.done(initValue, null);
+ }
+ };
+ }
+
+ @Override
+ public <T> QueryRunner<T> getQueryRunnerForSegments(
+ Query<T> query,
+ Iterable<SegmentDescriptor> specs
+ )
+ {
+ throw new UnsupportedOperationException();
+ }
+ },
+ new DefaultGenericQueryMetricsFactory(),
+ new NoopServiceEmitter(),
+ testRequestLogger,
+ new AuthConfig(),
+ AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+ Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
+ ),
+ jsonMapper,
+ smileMapper,
+ queryScheduler,
+ new AuthConfig(),
+ null,
+ ResponseContextConfig.newConfig(true),
+ DRUID_NODE
+ );
+
+ expectPermissiveHappyPathAuth();
+
+ org.eclipse.jetty.server.Response response =
this.jettyResponseforRequest(testServletRequest);
+ Assert.assertNull(queryResource.doPost(new ByteArrayInputStream(
+ SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)),
+ null /*pretty*/,
+ testServletRequest));
+ Assert.assertTrue(response.containsHeader(HttpHeader.TRAILER.toString()));
+ Assert.assertEquals(response.getHeader(HttpHeader.TRAILER.toString()),
QueryResultPusher.RESULT_TRAILER_HEADERS);
+
+ final HttpFields fields = response.getTrailers().get();
+
Assert.assertTrue(fields.containsKey(QueryResource.ERROR_MESSAGE_TRAILER_HEADER));
+ Assert.assertEquals(fields.get(QueryResource.ERROR_MESSAGE_TRAILER_HEADER),
+ "Query did not complete within configured timeout period. You can
increase query timeout or tune the performance of query.");
+
+
Assert.assertTrue(fields.containsKey(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER));
+
Assert.assertEquals(fields.get(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER),
"false");
+ }
+
+ @Test
+ public void testSuccessResponseWithTrailerHeader() throws IOException
+ {
+ queryResource = new QueryResource(
+ new QueryLifecycleFactory(
+ WAREHOUSE,
+ TEST_SEGMENT_WALKER,
+ new DefaultGenericQueryMetricsFactory(),
+ new NoopServiceEmitter(),
+ testRequestLogger,
+ new AuthConfig(),
+ AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+ Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
+ ),
+ jsonMapper,
+ smileMapper,
+ queryScheduler,
+ new AuthConfig(),
+ null,
+ ResponseContextConfig.newConfig(true),
+ DRUID_NODE
+ );
+
+ expectPermissiveHappyPathAuth();
+
+ org.eclipse.jetty.server.Response response =
this.jettyResponseforRequest(testServletRequest);
+ Assert.assertNull(queryResource.doPost(new ByteArrayInputStream(
+
SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)),
+ null /*pretty*/,
+ testServletRequest));
+ Assert.assertTrue(response.containsHeader(HttpHeader.TRAILER.toString()));
+
+ final HttpFields fields = response.getTrailers().get();
+
Assert.assertFalse(fields.containsKey(QueryResource.ERROR_MESSAGE_TRAILER_HEADER));
+
+
Assert.assertTrue(fields.containsKey(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER));
+
Assert.assertEquals(fields.get(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER),
"true");
+ }
@Test
public void testQueryThrowsRuntimeExceptionFromLifecycleExecute() throws
IOException
@@ -1459,4 +1589,29 @@ public class QueryResourceTest
{
return queryResource.doPost(new ByteArrayInputStream(bytes), null, req);
}
+
+ private org.eclipse.jetty.server.Response
jettyResponseforRequest(MockHttpServletRequest req) throws IOException
+ {
+ HttpChannel channelMock = EasyMock.mock(HttpChannel.class);
+ HttpOutput outputMock = EasyMock.mock(HttpOutput.class);
+ org.eclipse.jetty.server.Response response = new
org.eclipse.jetty.server.Response(channelMock, outputMock);
+
+ EasyMock.expect(channelMock.isSendError()).andReturn(false);
+ EasyMock.expect(channelMock.isCommitted()).andReturn(true);
+
+ outputMock.close();
+ EasyMock.expectLastCall().andVoid();
+
+ outputMock.write(EasyMock.anyObject(byte[].class), EasyMock.anyInt(),
EasyMock.anyInt());
+ EasyMock.expectLastCall().andVoid();
+
+ EasyMock.replay(outputMock, channelMock);
+
+ req.newAsyncContext(() -> {
+ final MockAsyncContext retVal = new MockAsyncContext();
+ retVal.response = response;
+ return retVal;
+ });
+ return response;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]