github-advanced-security[bot] commented on code in PR #15399:
URL: https://github.com/apache/druid/pull/15399#discussion_r1398658316
##########
server/src/main/java/org/apache/druid/discovery/DataServerResponseHandler.java:
##########
@@ -112,4 +219,69 @@
content.readBytes(contentBytes);
return contentBytes;
}
+
+ private boolean enqueue(ChannelBuffer buffer, long chunkNum) throws
InterruptedException
+ {
+ // Increment queuedByteCount before queueing the object, so
queuedByteCount is at least as high as
+ // the actual number of queued bytes at any particular time.
+ final InputStreamHolder holder =
InputStreamHolder.fromChannelBuffer(buffer, chunkNum);
+ final long currentQueuedByteCount =
queuedByteCount.addAndGet(holder.getLength());
+ queue.put(holder);
+
+ // True if we should keep reading.
+ return !usingBackpressure || currentQueuedByteCount < maxQueuedBytes;
+ }
+
+ private InputStream dequeue() throws InterruptedException
+ {
+ final InputStreamHolder holder = queue.poll(checkQueryTimeout(),
TimeUnit.MILLISECONDS);
+ if (holder == null) {
+ throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s]
timed out.", query.getId()));
+ }
+
+ final long currentQueuedByteCount =
queuedByteCount.addAndGet(-holder.getLength());
+ if (usingBackpressure && currentQueuedByteCount < maxQueuedBytes) {
+ Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop, how can
this be?").resume(holder.getChunkNum());
+ }
+
+ return holder.getStream();
+ }
+
+
+ // Returns remaining timeout or throws exception if timeout already elapsed.
+ private long checkQueryTimeout()
+ {
+ long timeLeft = failTime - System.currentTimeMillis();
+ if (timeLeft <= 0) {
+ String msg = StringUtils.format("Query[%s] timed out.", query.getId());
+ setupResponseReadFailure(msg, null);
+ throw new QueryTimeoutException(msg);
+ } else {
+ return timeLeft;
+ }
+ }
+
+ private void setupResponseReadFailure(String msg, Throwable th)
+ {
+ fail.set(msg);
+ queue.clear();
+ queue.offer(
+ InputStreamHolder.fromStream(
+ new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ if (th != null) {
+ throw new IOException(msg, th);
+ } else {
+ throw new IOException(msg);
+ }
+ }
+ },
+ -1,
+ 0
+ )
+ );
Review Comment:
## Ignored error status of call
Method setupResponseReadFailure ignores exceptional return value of
BlockingQueue<InputStreamHolder>.offer.
[Show more
details](https://github.com/apache/druid/security/code-scanning/5988)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]