churromorales commented on issue #5709:
URL: https://github.com/apache/druid/issues/5709#issuecomment-1332800780

   @gianm if we make the change to the `DirectDruidClient` and add the missing 
segments of the timed-out server, then the RetryQueryRunner would take care of 
it.  This is a bit hacky, but I want to know your thoughts on doing something 
like this: 
   
   ```@Override
           public ClientResponse<InputStream> handleResponse(HttpResponse 
response, TrafficCop trafficCop)
           {
             trafficCopRef.set(trafficCop);
             checkQueryTimeout();
             checkTotalBytesLimit(response.getContent().readableBytes());
   
             log.debug("Initial response from url[%s] for queryId[%s]", url, 
query.getId());
             responseStartTimeNs = System.nanoTime();
             
acquireResponseMetrics().reportNodeTimeToFirstByte(responseStartTimeNs - 
requestStartTimeNs).emit(emitter);
   
             final boolean continueReading;
             try {
               log.trace(
                   "Got a response from [%s] for query ID[%s], subquery ID[%s]",
                   url,
                   query.getId(),
                   query.getSubQueryId()
               );
               final String responseContext = 
response.headers().get(QueryResource.HEADER_RESPONSE_CONTEXT);
               context.addRemainingResponse(query.getMostSpecificId(), 
VAL_TO_REDUCE_REMAINING_RESPONSES);
               // context may be null in case of error or query timeout
               if (responseContext != null) {
                 context.merge(ResponseContext.deserialize(responseContext, 
objectMapper));
               } else {
                 // this is the case where we timeout and let the 
RetryQueryRunner try to make a query to
                 // the segments that were not able to be queried due to a 
query timeout.
                 // ******* CHANGES BELOW *********//
                 if (query instanceof BaseQuery) {
                   BaseQuery<T> myQuery = (BaseQuery<T>) query;
                   QuerySegmentSpec segmentSpec = myQuery.getQuerySegmentSpec();
                   if (segmentSpec instanceof MultipleSpecificSegmentSpec) {
                     ResponseContext context = ResponseContext.createEmpty();
                     context.addMissingSegments(((MultipleSpecificSegmentSpec) 
segmentSpec).getDescriptors());
                     context.merge(ResponseContext.deserialize(responseContext, 
objectMapper));
                     return ClientResponse.finished(new SequenceInputStream(new 
Enumeration<InputStream>()
                     {
                       @Override
                       public boolean hasMoreElements()
                       {
                         return false;
                       }
   
                       @Override
                       public InputStream nextElement()
                       {
                         return null;
                       }
                     }), false);
                   }
                 }
                // ******* CHANGES ABOVE *********//
               }
               continueReading = enqueue(response.getContent(), 0L);
             }
             catch (final IOException e) {
               log.error(e, "Error parsing response context from url [%s]", 
url);
               return ClientResponse.finished(
                   new InputStream()
                   {
                     @Override
                     public int read() throws IOException
                     {
                       throw e;
                     }
                   }
               );
             }
             catch (InterruptedException e) {
               log.error(e, "Queue appending interrupted");
               Thread.currentThread().interrupt();
               throw new RuntimeException(e);
             }
             totalByteCount.addAndGet(response.getContent().readableBytes());
             return ClientResponse.finished(
                 new SequenceInputStream(
                     new Enumeration<InputStream>()
                     {
                       @Override
                       public boolean hasMoreElements()
                       {
                         if (fail.get() != null) {
                           throw new RE(fail.get());
                         }
                         checkQueryTimeout();
   
                         // Done is always true until the last stream has be 
put in the queue.
                         // Then the stream should be spouting good 
InputStreams.
                         synchronized (done) {
                           return !done.get() || !queue.isEmpty();
                         }
                       }
   
                       @Override
                       public InputStream nextElement()
                       {
                         if (fail.get() != null) {
                           throw new RE(fail.get());
                         }
   
                         try {
                           return dequeue();
                         }
                         catch (InterruptedException e) {
                           Thread.currentThread().interrupt();
                           throw new RuntimeException(e);
                         }
                       }
                     }
                 ),
                 continueReading
             );
           }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to