bbeaudreault commented on code in PR #5350:
URL: https://github.com/apache/hbase/pull/5350#discussion_r1294007766


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java:
##########
@@ -55,4 +69,33 @@ public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
       ctx.write(msg, promise);
     }
   }
+
+  private boolean handleFatalThreshold(ChannelHandlerContext ctx) {
+    int fatalThreshold = rpcServer.getWriteBufferFatalThreshold();
+    if (fatalThreshold <= 0) {
+      return false;
+    }
+
+    Channel channel = ctx.channel();
+    long outboundBytes = 
NettyUnsafeUtils.getTotalPendingOutboundBytes(channel);
+    if (outboundBytes < fatalThreshold) {
+      return false;
+    }
+
+    NettyServerRpcConnection conn = NettyServerRpcConnection.get(channel);

Review Comment:
   Done



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java:
##########
@@ -95,14 +108,59 @@ void process(ByteBuf buf) throws IOException, 
InterruptedException {
     }
   }
 
+  /**
+   * Sets the writable state on the connection, and tracks metrics around time 
spent unwritable.
+   * When unwritable, we setAutoRead(false) so that the server does not read 
any more bytes from the
+   * client until it's able to flush some outbound bytes first.
+   */
+  void setWritable(boolean newWritableValue) {
+    assert channel.eventLoop().inEventLoop();
+
+    if (!rpcServer.isWriteBufferWaterMarkEnabled()) {
+      return;
+    }
+
+    boolean oldWritableValue = this.writable;
+    this.writable = newWritableValue;
+    channel.config().setAutoRead(newWritableValue);
+
+    if (!oldWritableValue && newWritableValue) {
+      // changing from not writable to writable, update metrics
+      rpcServer.getMetrics()
+        .unwritableTime(EnvironmentEdgeManager.currentTime() - 
unwritableStartTime);
+      unwritableStartTime = 0;
+    } else if (oldWritableValue && !newWritableValue) {
+      // changing from writable to non-writable, set start time
+      unwritableStartTime = EnvironmentEdgeManager.currentTime();
+    }
+  }
+
+  /**
+   * Immediately and forcibly closes the connection. To be used only from the 
event loop and only in
+   * cases where a more graceful close is not possible.
+   */
+  void abort() {
+    assert channel.eventLoop().inEventLoop();
+    if (aborted) {
+      return;
+    }
+
+    // We need to forcefully abort, because otherwise memory will continue to 
build up
+    // while graceful close is executed (dependent on handlers). Especially 
true
+    // when SslHandler is enabled, as it prefers to send a close_notify to the 
client first.
+    channel.config().setOption(ChannelOption.SO_LINGER, 0);
+    NettyUnsafeUtils.closeDirect(channel);
+    aborted = true;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to