cheddar commented on a change in pull request #12032:
URL: https://github.com/apache/druid/pull/12032#discussion_r773669393



##########
File path: 
core/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
##########
@@ -227,30 +221,27 @@ public void messageReceived(ChannelHandlerContext ctx, 
MessageEvent e)
                 }
 
                 assert currentChunkNum == 0;
-                possiblySuspendReads(response);
-
-                if (!httpResponse.isChunked()) {
-                  finishRequest();
-                }
-              } else if (msg instanceof HttpChunk) {
-                HttpChunk httpChunk = (HttpChunk) msg;
+                possiblyRead(response);
+              } else if (msg instanceof HttpContent) {
+                HttpContent httpChunk = (HttpContent) msg;
                 if (log.isDebugEnabled()) {
                   log.debug(
                       "[%s] Got chunk: %sB, last=%s",
                       requestDesc,
-                      httpChunk.getContent().readableBytes(),
-                      httpChunk.isLast()
+                      httpChunk.content().readableBytes(),
+                      httpChunk instanceof LastHttpContent
                   );
                 }
 
-                if (httpChunk.isLast()) {
+                response = handler.handleChunk(response, httpChunk, 
++currentChunkNum);
+                if (response.isFinished() && !retVal.isDone()) {
+                  retVal.set((Final) response.getObj());
+                }
+
+                if (httpChunk instanceof LastHttpContent) {

Review comment:
       is a reason not to merge the logic from this `if` with the 
`if(response.isFinished()...)` above?

##########
File path: 
core/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
##########
@@ -267,17 +258,23 @@ public void messageReceived(ChannelHandlerContext ctx, 
MessageEvent e)
 
               throw ex;
             }
+            finally {
+              ReferenceCountUtil.release(msg);
+            }
           }
 
-          private void possiblySuspendReads(ClientResponse<?> response)
+          private void possiblyRead(ClientResponse<?> response)
           {
-            if (!response.isContinueReading()) {
+            if (response.isContinueReading()) {
+              channel.read();
+            } else {
               synchronized (watermarkLock) {
                 suspendWatermark = Math.max(suspendWatermark, currentChunkNum);
                 if (suspendWatermark > resumeWatermark) {
-                  channel.setReadable(false);
                   backPressureStartTimeNs = System.nanoTime();
                   log.debug("[%s] Suspended reads from channel (chunkNum = 
%,d).", requestDesc, currentChunkNum);

Review comment:
       Nit: this should really be "delaying reads from channel" now.

##########
File path: 
core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java
##########
@@ -56,18 +56,15 @@
   @Override
   public ClientResponse<InputStream> handleResponse(HttpResponse response, 
TrafficCop trafficCop)
   {
-    try (ChannelBufferInputStream channelStream = new 
ChannelBufferInputStream(response.getContent())) {
-      queue.put(channelStream);
-    }
-    catch (IOException e) {
-      throw new RuntimeException(e);
+    try {
+      // add empty initial buffer since SequenceInputStream will peek the 
first element right away

Review comment:
       This comment is not telling me why it's bad that it's peeking the first 
element right away?  Should we fix that instead of priming the queue?

##########
File path: 
core/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
##########
@@ -355,30 +355,31 @@ public void channelDisconnected(ChannelHandlerContext 
context, ChannelStateEvent
           private void removeHandlers()
           {
             if (readTimeout > 0) {
-              channel.getPipeline().remove(READ_TIMEOUT_HANDLER_NAME);
+              channel.pipeline().remove(READ_TIMEOUT_HANDLER_NAME);
             }
-            channel.getPipeline().remove(LAST_HANDLER_NAME);
+            channel.pipeline().remove(LAST_HANDLER_NAME);
           }
         }
     );
 
-    channel.write(httpRequest).addListener(
-        new ChannelFutureListener()
-        {
-          @Override
-          public void operationComplete(ChannelFuture future)
-          {
-            if (!future.isSuccess()) {
-              channel.close();
-              channelResourceContainer.returnResource();
-              if (!retVal.isDone()) {
-                retVal.setException(
-                    new ChannelException(
-                        StringUtils.format("[%s] Failed to write request to 
channel", requestDesc),
-                        future.getCause()
-                    )
-                );
-              }
+    channel.write(httpRequest);
+    if (request.hasContent()) {
+      channel.write(new DefaultHttpContent(request.getContent()));

Review comment:
       Is there really no way to add this to the `httpRequest` object such that 
it "just works"?  I haven't looked at the interfaces on Netty 4 to verify this, 
so it's perhaps a naive question.

##########
File path: 
core/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
##########
@@ -192,18 +179,25 @@ public void stop()
           private long resumeWatermark = -1;
 
           @Override
-          public void messageReceived(ChannelHandlerContext ctx, MessageEvent 
e)
+          public void channelRead(ChannelHandlerContext ctx, Object msg)
           {
             if (log.isDebugEnabled()) {

Review comment:
       This is a nit, but this can probably move inside the `try`.  If, for 
some reason, this threw an exception, we wouldn't release the `msg` resource.

##########
File path: 
core/src/main/java/org/apache/druid/java/util/http/client/Request.java
##########
@@ -90,19 +76,38 @@ public boolean hasContent()
     return content != null;
   }
 
-  public ChannelBuffer getContent()
+  public ByteBuf getContent()
   {
-    return content;
+    // return a duplicate buffer since with increased reference count
+    // this ensures Netty does not free the underlying array after it gets 
handled,
+    // since we sometimes read the buffer after it has been dispatched to Netty
+    // (e.g. when alling withUrl or copy, which migh happen after Netty has 
handled it already)

Review comment:
       nit: missing `c` on `alling`.  Also missing `t` on `migh`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to