gianm commented on code in PR #15084:
URL: https://github.com/apache/druid/pull/15084#discussion_r1349365474
##########
processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java:
##########
@@ -114,20 +124,34 @@ private ClientResponse<FrameFilePartialFetch> response(
return ClientResponse.finished(clientResponseObj);
}
- final byte[] chunk = new byte[content.readableBytes()];
- content.getBytes(content.readerIndex(), chunk);
+ final byte[] chunk;
+ final int chunkSize = content.readableBytes();
- try {
- final ListenableFuture<?> backpressureFuture = channel.addChunk(chunk);
+ // Potentially skip some of this chunk, if the relevant bytes have already
been read by the handler. This can
+ // happen if a request reads some data, then fails with a retryable I/O
error, and then is retried. The retry
+ // will re-read some data that has already been added to the channel, so
we need to skip it.
+ final long readByThisHandler = channel.getBytesAdded() - startOffset;
+ final long readByThisRequest = clientResponseObj.getBytesRead(); // Prior
to the current chunk
+ final long toSkip = readByThisHandler - readByThisRequest;
- if (backpressureFuture != null) {
- clientResponseObj.setBackpressureFuture(backpressureFuture);
- }
+ clientResponseObj.addBytesRead(chunkSize);
Review Comment:
It is, since "bytes read" tracks how many bytes have been read from the
response, not how many have been written to the channel. Its purpose is to let
us know when to stop skipping (we keep incrementing it until it passes
`channel.getBytesAdded() - startOffset`). I added a comment about this.
##########
processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java:
##########
@@ -114,20 +124,34 @@ private ClientResponse<FrameFilePartialFetch> response(
return ClientResponse.finished(clientResponseObj);
}
- final byte[] chunk = new byte[content.readableBytes()];
- content.getBytes(content.readerIndex(), chunk);
+ final byte[] chunk;
+ final int chunkSize = content.readableBytes();
- try {
- final ListenableFuture<?> backpressureFuture = channel.addChunk(chunk);
+ // Potentially skip some of this chunk, if the relevant bytes have already
been read by the handler. This can
+ // happen if a request reads some data, then fails with a retryable I/O
error, and then is retried. The retry
+ // will re-read some data that has already been added to the channel, so
we need to skip it.
+ final long readByThisHandler = channel.getBytesAdded() - startOffset;
+ final long readByThisRequest = clientResponseObj.getBytesRead(); // Prior
to the current chunk
+ final long toSkip = readByThisHandler - readByThisRequest;
- if (backpressureFuture != null) {
- clientResponseObj.setBackpressureFuture(backpressureFuture);
- }
+ clientResponseObj.addBytesRead(chunkSize);
- clientResponseObj.addBytesRead(chunk.length);
- }
- catch (Exception e) {
- clientResponseObj.exceptionCaught(e);
+ if (toSkip < 0) {
+ throw DruidException.defensive("Expected toSkip[%d] to be nonnegative",
toSkip);
+ } else if (toSkip < chunkSize) {
+ chunk = new byte[chunkSize - (int) toSkip];
+ content.getBytes(content.readerIndex() + (int) toSkip, chunk);
+
+ try {
+ final ListenableFuture<?> backpressureFuture = channel.addChunk(chunk);
+
+ if (backpressureFuture != null) {
+ clientResponseObj.setBackpressureFuture(backpressureFuture);
+ }
+ }
+ catch (Exception e) {
+ clientResponseObj.exceptionCaught(e);
+ }
Review Comment:
The `else` would have en empty body, so instead of adding that, I just added
a comment next to the `else if`.
##########
processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java:
##########
@@ -114,20 +124,34 @@ private ClientResponse<FrameFilePartialFetch> response(
return ClientResponse.finished(clientResponseObj);
}
- final byte[] chunk = new byte[content.readableBytes()];
- content.getBytes(content.readerIndex(), chunk);
+ final byte[] chunk;
+ final int chunkSize = content.readableBytes();
- try {
- final ListenableFuture<?> backpressureFuture = channel.addChunk(chunk);
+ // Potentially skip some of this chunk, if the relevant bytes have already
been read by the handler. This can
+ // happen if a request reads some data, then fails with a retryable I/O
error, and then is retried. The retry
+ // will re-read some data that has already been added to the channel, so
we need to skip it.
+ final long readByThisHandler = channel.getBytesAdded() - startOffset;
+ final long readByThisRequest = clientResponseObj.getBytesRead(); // Prior
to the current chunk
+ final long toSkip = readByThisHandler - readByThisRequest;
- if (backpressureFuture != null) {
- clientResponseObj.setBackpressureFuture(backpressureFuture);
- }
+ clientResponseObj.addBytesRead(chunkSize);
Review Comment:
Moved to the line before the `return`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]