cryptoe commented on code in PR #15084:
URL: https://github.com/apache/druid/pull/15084#discussion_r1345879396


##########
processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java:
##########
@@ -114,20 +124,34 @@ private ClientResponse<FrameFilePartialFetch> response(
       return ClientResponse.finished(clientResponseObj);
     }
 
-    final byte[] chunk = new byte[content.readableBytes()];
-    content.getBytes(content.readerIndex(), chunk);
+    final byte[] chunk;
+    final int chunkSize = content.readableBytes();
 
-    try {
-      final ListenableFuture<?> backpressureFuture = channel.addChunk(chunk);
+    // Potentially skip some of this chunk, if the relevant bytes have already 
been read by the handler. This can
+    // happen if a request reads some data, then fails with a retryable I/O 
error, and then is retried. The retry
+    // will re-read some data that has already been added to the channel, so 
we need to skip it.
+    final long readByThisHandler = channel.getBytesAdded() - startOffset;
+    final long readByThisRequest = clientResponseObj.getBytesRead(); // Prior 
to the current chunk
+    final long toSkip = readByThisHandler - readByThisRequest;
 
-      if (backpressureFuture != null) {
-        clientResponseObj.setBackpressureFuture(backpressureFuture);
-      }
+    clientResponseObj.addBytesRead(chunkSize);
 
-      clientResponseObj.addBytesRead(chunk.length);
-    }
-    catch (Exception e) {
-      clientResponseObj.exceptionCaught(e);
+    if (toSkip < 0) {
+      throw DruidException.defensive("Expected toSkip[%d] to be nonnegative", 
toSkip);
+    } else if (toSkip < chunkSize) {
+      chunk = new byte[chunkSize - (int) toSkip];
+      content.getBytes(content.readerIndex() + (int) toSkip, chunk);
+
+      try {
+        final ListenableFuture<?> backpressureFuture = channel.addChunk(chunk);
+
+        if (backpressureFuture != null) {
+          clientResponseObj.setBackpressureFuture(backpressureFuture);
+        }
+      }
+      catch (Exception e) {
+        clientResponseObj.exceptionCaught(e);
+      }

Review Comment:
   Nit: toSkip> chunk size since we have already have that data in the channel, 
so we basically do nothing. Can we add an explicit else and mention that in a 
comment. 



##########
processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java:
##########
@@ -114,20 +124,34 @@ private ClientResponse<FrameFilePartialFetch> response(
       return ClientResponse.finished(clientResponseObj);
     }
 
-    final byte[] chunk = new byte[content.readableBytes()];
-    content.getBytes(content.readerIndex(), chunk);
+    final byte[] chunk;
+    final int chunkSize = content.readableBytes();
 
-    try {
-      final ListenableFuture<?> backpressureFuture = channel.addChunk(chunk);
+    // Potentially skip some of this chunk, if the relevant bytes have already 
been read by the handler. This can
+    // happen if a request reads some data, then fails with a retryable I/O 
error, and then is retried. The retry
+    // will re-read some data that has already been added to the channel, so 
we need to skip it.
+    final long readByThisHandler = channel.getBytesAdded() - startOffset;
+    final long readByThisRequest = clientResponseObj.getBytesRead(); // Prior 
to the current chunk
+    final long toSkip = readByThisHandler - readByThisRequest;
 
-      if (backpressureFuture != null) {
-        clientResponseObj.setBackpressureFuture(backpressureFuture);
-      }
+    clientResponseObj.addBytesRead(chunkSize);

Review Comment:
   In the skip case we are not basically reading all the bytes. We are skipping 
to `chunkSize - (int) toSkip]`. So does adding in all the bytes is valid?



##########
processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java:
##########
@@ -114,20 +124,34 @@ private ClientResponse<FrameFilePartialFetch> response(
       return ClientResponse.finished(clientResponseObj);
     }
 
-    final byte[] chunk = new byte[content.readableBytes()];
-    content.getBytes(content.readerIndex(), chunk);
+    final byte[] chunk;
+    final int chunkSize = content.readableBytes();
 
-    try {
-      final ListenableFuture<?> backpressureFuture = channel.addChunk(chunk);
+    // Potentially skip some of this chunk, if the relevant bytes have already 
been read by the handler. This can
+    // happen if a request reads some data, then fails with a retryable I/O 
error, and then is retried. The retry
+    // will re-read some data that has already been added to the channel, so 
we need to skip it.
+    final long readByThisHandler = channel.getBytesAdded() - startOffset;
+    final long readByThisRequest = clientResponseObj.getBytesRead(); // Prior 
to the current chunk
+    final long toSkip = readByThisHandler - readByThisRequest;
 
-      if (backpressureFuture != null) {
-        clientResponseObj.setBackpressureFuture(backpressureFuture);
-      }
+    clientResponseObj.addBytesRead(chunkSize);

Review Comment:
   Nit: I would expect to add bytes read at the end before returning the 
clientResponseObject. 
   More of a readability thing. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to