churromorales commented on issue #5709:
URL: https://github.com/apache/druid/issues/5709#issuecomment-1332800780
@gianm if we make the change to the `DirectDruidClient` and add the missing
segments of the timed-out server, then the RetryQueryRunner would take care of
it. This is a bit hacky, but I want to know your thoughts on doing something
like this:
```@Override
public ClientResponse<InputStream> handleResponse(HttpResponse
response, TrafficCop trafficCop)
{
trafficCopRef.set(trafficCop);
checkQueryTimeout();
checkTotalBytesLimit(response.getContent().readableBytes());
log.debug("Initial response from url[%s] for queryId[%s]", url,
query.getId());
responseStartTimeNs = System.nanoTime();
acquireResponseMetrics().reportNodeTimeToFirstByte(responseStartTimeNs -
requestStartTimeNs).emit(emitter);
final boolean continueReading;
try {
log.trace(
"Got a response from [%s] for query ID[%s], subquery ID[%s]",
url,
query.getId(),
query.getSubQueryId()
);
final String responseContext =
response.headers().get(QueryResource.HEADER_RESPONSE_CONTEXT);
context.addRemainingResponse(query.getMostSpecificId(),
VAL_TO_REDUCE_REMAINING_RESPONSES);
// context may be null in case of error or query timeout
if (responseContext != null) {
context.merge(ResponseContext.deserialize(responseContext,
objectMapper));
} else {
// this is the case where we timeout and let the
RetryQueryRunner try to make a query to
// the segments that were not able to be queried due to a
query timeout.
// ******* CHANGES BELOW *********//
if (query instanceof BaseQuery) {
BaseQuery<T> myQuery = (BaseQuery<T>) query;
QuerySegmentSpec segmentSpec = myQuery.getQuerySegmentSpec();
if (segmentSpec instanceof MultipleSpecificSegmentSpec) {
ResponseContext context = ResponseContext.createEmpty();
context.addMissingSegments(((MultipleSpecificSegmentSpec)
segmentSpec).getDescriptors());
context.merge(ResponseContext.deserialize(responseContext,
objectMapper));
return ClientResponse.finished(new SequenceInputStream(new
Enumeration<InputStream>()
{
@Override
public boolean hasMoreElements()
{
return false;
}
@Override
public InputStream nextElement()
{
return null;
}
}), false);
}
}
// ******* CHANGES ABOVE *********//
}
continueReading = enqueue(response.getContent(), 0L);
}
catch (final IOException e) {
log.error(e, "Error parsing response context from url [%s]",
url);
return ClientResponse.finished(
new InputStream()
{
@Override
public int read() throws IOException
{
throw e;
}
}
);
}
catch (InterruptedException e) {
log.error(e, "Queue appending interrupted");
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
totalByteCount.addAndGet(response.getContent().readableBytes());
return ClientResponse.finished(
new SequenceInputStream(
new Enumeration<InputStream>()
{
@Override
public boolean hasMoreElements()
{
if (fail.get() != null) {
throw new RE(fail.get());
}
checkQueryTimeout();
// Done is always true until the last stream has be
put in the queue.
// Then the stream should be spouting good
InputStreams.
synchronized (done) {
return !done.get() || !queue.isEmpty();
}
}
@Override
public InputStream nextElement()
{
if (fail.get() != null) {
throw new RE(fail.get());
}
try {
return dequeue();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
),
continueReading
);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]