github-advanced-security[bot] commented on code in PR #15399:
URL: https://github.com/apache/druid/pull/15399#discussion_r1400006292
##########
server/src/main/java/org/apache/druid/discovery/DataServerResponseHandler.java:
##########
@@ -78,38 +112,171 @@
}
);
}
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ return ClientResponse.finished(
+ new SequenceInputStream(
+ new Enumeration<InputStream>()
+ {
+ @Override
+ public boolean hasMoreElements()
+ {
+ if (fail.get() != null) {
+ throw new RE(fail.get());
+ }
+ checkQueryTimeout();
+
+ // Done is always true until the last stream has be put in the
queue.
+ // Then the stream should be spouting good InputStreams.
+ synchronized (done) {
+ return !done.get() || !queue.isEmpty();
+ }
+ }
+
+ @Override
+ public InputStream nextElement()
+ {
+ if (fail.get() != null) {
+ throw new RE(fail.get());
+ }
+
+ try {
+ return dequeue();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ ),
+ continueReading
+ );
}
@Override
- public ClientResponse<AppendableByteArrayInputStream> handleChunk(
- ClientResponse<AppendableByteArrayInputStream> clientResponse,
+ public ClientResponse<InputStream> handleChunk(
+ ClientResponse<InputStream> clientResponse,
HttpChunk chunk,
long chunkNum
)
{
- clientResponse.getObj().add(getContentBytes(chunk.getContent()));
- return clientResponse;
+ checkQueryTimeout();
+
+ final ChannelBuffer channelBuffer = chunk.getContent();
+ final int bytes = channelBuffer.readableBytes();
+
+ boolean continueReading = true;
+ if (bytes > 0) {
+ try {
+ continueReading = enqueue(channelBuffer, chunkNum);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ totalByteCount.addAndGet(bytes);
+ }
+
+ return ClientResponse.finished(clientResponse.getObj(), continueReading);
}
@Override
- public ClientResponse<InputStream>
done(ClientResponse<AppendableByteArrayInputStream> clientResponse)
+ public ClientResponse<InputStream> done(ClientResponse<InputStream>
clientResponse)
{
- final AppendableByteArrayInputStream obj = clientResponse.getObj();
- obj.done();
- return ClientResponse.finished(obj);
+ log.debug("Finished reading response for queryId[%s]", query.getId());
+ synchronized (done) {
+ try {
+ // An empty byte array is put at the end to give the
SequenceInputStream.close() as something to close out
+ // after done is set to true, regardless of the rest of the stream's
state.
+
queue.put(InputStreamHolder.fromChannelBuffer(ChannelBuffers.EMPTY_BUFFER,
Long.MAX_VALUE));
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ finally {
+ done.set(true);
+ }
+ }
+ return ClientResponse.finished(clientResponse.getObj());
}
@Override
- public void exceptionCaught(ClientResponse<AppendableByteArrayInputStream>
clientResponse, Throwable e)
+ public void exceptionCaught(ClientResponse<InputStream> clientResponse,
Throwable e)
{
- final AppendableByteArrayInputStream obj = clientResponse.getObj();
- obj.exceptionCaught(e);
+ String msg = StringUtils.format(
+ "Query[%s] failed with exception msg [%s]",
+ query.getId(),
+ e.getMessage()
+ );
+ setupResponseReadFailure(msg, e);
}
- private byte[] getContentBytes(ChannelBuffer content)
+ private boolean enqueue(ChannelBuffer buffer, long chunkNum) throws
InterruptedException
{
- byte[] contentBytes = new byte[content.readableBytes()];
- content.readBytes(contentBytes);
- return contentBytes;
+ // Increment queuedByteCount before queueing the object, so
queuedByteCount is at least as high as
+ // the actual number of queued bytes at any particular time.
+ final InputStreamHolder holder =
InputStreamHolder.fromChannelBuffer(buffer, chunkNum);
+ final long currentQueuedByteCount =
queuedByteCount.addAndGet(holder.getLength());
+ queue.put(holder);
+
+ // True if we should keep reading.
+ return !usingBackpressure || currentQueuedByteCount < maxQueuedBytes;
+ }
+
+ private InputStream dequeue() throws InterruptedException
+ {
+ final InputStreamHolder holder = queue.poll(checkQueryTimeout(),
TimeUnit.MILLISECONDS);
+ if (holder == null) {
+ throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s]
timed out.", query.getId()));
+ }
+
+ final long currentQueuedByteCount =
queuedByteCount.addAndGet(-holder.getLength());
+ if (usingBackpressure && currentQueuedByteCount < maxQueuedBytes) {
+ Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop, how can
this be?").resume(holder.getChunkNum());
+ }
+
+ return holder.getStream();
+ }
+
+
+ // Returns remaining timeout or throws exception if timeout already elapsed.
+ private long checkQueryTimeout()
+ {
+ long timeLeft = failTime - System.currentTimeMillis();
+ if (timeLeft <= 0) {
+ String msg = StringUtils.format("Query[%s] timed out.", query.getId());
+ setupResponseReadFailure(msg, null);
+ throw new QueryTimeoutException(msg);
+ } else {
+ return timeLeft;
+ }
+ }
+
+ private void setupResponseReadFailure(String msg, Throwable th)
+ {
+ fail.set(msg);
+ queue.clear();
+ boolean ignored = queue.offer(
+ InputStreamHolder.fromStream(
+ new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ if (th != null) {
+ throw new IOException(msg, th);
+ } else {
+ throw new IOException(msg);
+ }
+ }
+ },
+ -1,
+ 0
+ )
+ );
Review Comment:
## Unread local variable
Variable 'boolean ignored' is never read.
[Show more
details](https://github.com/apache/druid/security/code-scanning/5989)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]