Apache9 commented on code in PR #5350:
URL: https://github.com/apache/hbase/pull/5350#discussion_r1292792153
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java:
##########
@@ -55,4 +69,33 @@ public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise)
ctx.write(msg, promise);
}
}
+
+ private boolean handleFatalThreshold(ChannelHandlerContext ctx) {
+ int fatalThreshold = rpcServer.getWriteBufferFatalThreshold();
Review Comment:
We just need this threshold? Let's just pass the threshold in?
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java:
##########
@@ -95,14 +108,59 @@ void process(ByteBuf buf) throws IOException,
InterruptedException {
}
}
+ /**
+ * Sets the writable state on the connection, and tracks metrics around time
spent unwritable.
+ * When unwritable, we setAutoRead(false) so that the server does not read
any more bytes from the
+ * client until it's able to flush some outbound bytes first.
+ */
+ void setWritable(boolean newWritableValue) {
+ assert channel.eventLoop().inEventLoop();
+
+ if (!rpcServer.isWriteBufferWaterMarkEnabled()) {
+ return;
+ }
+
+ boolean oldWritableValue = this.writable;
+ this.writable = newWritableValue;
+ channel.config().setAutoRead(newWritableValue);
+
+ if (!oldWritableValue && newWritableValue) {
+ // changing from not writable to writable, update metrics
+ rpcServer.getMetrics()
+ .unwritableTime(EnvironmentEdgeManager.currentTime() -
unwritableStartTime);
+ unwritableStartTime = 0;
+ } else if (oldWritableValue && !newWritableValue) {
+ // changing from writable to non-writable, set start time
+ unwritableStartTime = EnvironmentEdgeManager.currentTime();
+ }
+ }
+
+ /**
+ * Immediately and forcibly closes the connection. To be used only from the
event loop and only in
+ * cases where a more graceful close is not possible.
+ */
+ void abort() {
+ assert channel.eventLoop().inEventLoop();
+ if (aborted) {
+ return;
+ }
+
+ // We need to forcefully abort, because otherwise memory will continue to
build up
+ // while graceful close is executed (dependent on handlers). Especially
true
+ // when SslHandler is enabled, as it prefers to send a close_notify to the
client first.
+ channel.config().setOption(ChannelOption.SO_LINGER, 0);
+ NettyUnsafeUtils.closeDirect(channel);
+ aborted = true;
Review Comment:
After thte above close, channel.isOpen could still returns true? Otherwise
we do not need to test aborted in the below isConnectionOpen method?
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java:
##########
@@ -55,4 +69,33 @@ public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise)
ctx.write(msg, promise);
}
}
+
+ private boolean handleFatalThreshold(ChannelHandlerContext ctx) {
+ int fatalThreshold = rpcServer.getWriteBufferFatalThreshold();
+ if (fatalThreshold <= 0) {
+ return false;
+ }
+
+ Channel channel = ctx.channel();
+ long outboundBytes =
NettyUnsafeUtils.getTotalPendingOutboundBytes(channel);
+ if (outboundBytes < fatalThreshold) {
+ return false;
+ }
+
+ NettyServerRpcConnection conn = NettyServerRpcConnection.get(channel);
Review Comment:
Just pass the connection in when creating the encoder? Like what we have
done in decoder?
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java:
##########
@@ -87,11 +87,11 @@
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
"VO_VOLATILE_INCREMENT",
justification = "False positive according to
http://sourceforge.net/p/findbugs/bugs/1032/")
@InterfaceAudience.Private
-abstract class ServerRpcConnection implements Closeable {
+abstract class ServerRpcConnection<T extends RpcServer> implements Closeable {
Review Comment:
This is for letting Netty related classes can get NettyRpcServer without
casting?
##########
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java:
##########
@@ -64,4 +65,6 @@ public interface MetricsHBaseServerWrapper {
int getActiveScanRpcHandlerCount();
long getNettyDmUsage();
+
+ Pair<Long, Long> getTotalAndMaxNettyOutboundBytes();
Review Comment:
Why not introducing two method for getting these two metrics? Just asking...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]