Apache9 commented on code in PR #5350:
URL: https://github.com/apache/hbase/pull/5350#discussion_r1292792153


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java:
##########
@@ -55,4 +69,33 @@ public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
       ctx.write(msg, promise);
     }
   }
+
+  private boolean handleFatalThreshold(ChannelHandlerContext ctx) {
+    int fatalThreshold = rpcServer.getWriteBufferFatalThreshold();

Review Comment:
   We just need this threshold? Let's just pass the threshold in?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java:
##########
@@ -95,14 +108,59 @@ void process(ByteBuf buf) throws IOException, 
InterruptedException {
     }
   }
 
+  /**
+   * Sets the writable state on the connection, and tracks metrics around time 
spent unwritable.
+   * When unwritable, we setAutoRead(false) so that the server does not read 
any more bytes from the
+   * client until it's able to flush some outbound bytes first.
+   */
+  void setWritable(boolean newWritableValue) {
+    assert channel.eventLoop().inEventLoop();
+
+    if (!rpcServer.isWriteBufferWaterMarkEnabled()) {
+      return;
+    }
+
+    boolean oldWritableValue = this.writable;
+    this.writable = newWritableValue;
+    channel.config().setAutoRead(newWritableValue);
+
+    if (!oldWritableValue && newWritableValue) {
+      // changing from not writable to writable, update metrics
+      rpcServer.getMetrics()
+        .unwritableTime(EnvironmentEdgeManager.currentTime() - 
unwritableStartTime);
+      unwritableStartTime = 0;
+    } else if (oldWritableValue && !newWritableValue) {
+      // changing from writable to non-writable, set start time
+      unwritableStartTime = EnvironmentEdgeManager.currentTime();
+    }
+  }
+
+  /**
+   * Immediately and forcibly closes the connection. To be used only from the 
event loop and only in
+   * cases where a more graceful close is not possible.
+   */
+  void abort() {
+    assert channel.eventLoop().inEventLoop();
+    if (aborted) {
+      return;
+    }
+
+    // We need to forcefully abort, because otherwise memory will continue to 
build up
+    // while graceful close is executed (dependent on handlers). Especially 
true
+    // when SslHandler is enabled, as it prefers to send a close_notify to the 
client first.
+    channel.config().setOption(ChannelOption.SO_LINGER, 0);
+    NettyUnsafeUtils.closeDirect(channel);
+    aborted = true;

Review Comment:
   After thte above close, channel.isOpen could still returns true? Otherwise 
we do not need to test aborted in the below isConnectionOpen method?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java:
##########
@@ -55,4 +69,33 @@ public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
       ctx.write(msg, promise);
     }
   }
+
+  private boolean handleFatalThreshold(ChannelHandlerContext ctx) {
+    int fatalThreshold = rpcServer.getWriteBufferFatalThreshold();
+    if (fatalThreshold <= 0) {
+      return false;
+    }
+
+    Channel channel = ctx.channel();
+    long outboundBytes = 
NettyUnsafeUtils.getTotalPendingOutboundBytes(channel);
+    if (outboundBytes < fatalThreshold) {
+      return false;
+    }
+
+    NettyServerRpcConnection conn = NettyServerRpcConnection.get(channel);

Review Comment:
   Just pass the connection in when creating the encoder? Like what we have 
done in decoder?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java:
##########
@@ -87,11 +87,11 @@
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"VO_VOLATILE_INCREMENT",
     justification = "False positive according to 
http://sourceforge.net/p/findbugs/bugs/1032/";)
 @InterfaceAudience.Private
-abstract class ServerRpcConnection implements Closeable {
+abstract class ServerRpcConnection<T extends RpcServer> implements Closeable {

Review Comment:
   This is for letting Netty related classes can get NettyRpcServer without 
casting?



##########
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java:
##########
@@ -64,4 +65,6 @@ public interface MetricsHBaseServerWrapper {
   int getActiveScanRpcHandlerCount();
 
   long getNettyDmUsage();
+
+  Pair<Long, Long> getTotalAndMaxNettyOutboundBytes();

Review Comment:
   Why not introducing two method for getting these two metrics? Just asking...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to