Apache9 commented on code in PR #5350:
URL: https://github.com/apache/hbase/pull/5350#discussion_r1294611221


##########
hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyUnsafeUtils.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+
+/**
+ * Wraps some usages of netty's unsafe API, for ease of maintainability.
+ */
[email protected]
+public final class NettyUnsafeUtils {
+
+  private NettyUnsafeUtils() {
+  }
+
+  /**
+   * Directly closes the channel, skipping any handlers in the pipeline

Review Comment:
   Better add more comments to say where we will use it, so later developers 
will know whether they should use this method.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java:
##########
@@ -108,28 +147,34 @@ public NettyRpcServer(Server server, String name, 
List<BlockingServiceAndInterfa
     if (config == null) {
       config = new NettyEventLoopGroupConfig(conf, "NettyRpcServer");
     }
+
+    // call before creating bootstrap below so that the necessary configs can 
be set
+    configureNettyWatermarks(conf);
+
     EventLoopGroup eventLoopGroup = config.group();
     Class<? extends ServerChannel> channelClass = config.serverChannelClass();
-    ServerBootstrap bootstrap = new 
ServerBootstrap().group(eventLoopGroup).channel(channelClass)
+    bootstrap = new 
ServerBootstrap().group(eventLoopGroup).channel(channelClass)

Review Comment:
   Why we need to make this a class field?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java:
##########
@@ -142,6 +187,71 @@ protected void initChannel(Channel ch) throws Exception {
     this.scheduler.init(new RpcSchedulerContext(this));
   }
 
+  @Override
+  public void onConfigurationChange(Configuration newConf) {
+    super.onConfigurationChange(newConf);
+    configureNettyWatermarks(newConf);
+  }
+
+  private void configureNettyWatermarks(Configuration conf) {
+    int watermarkLow =
+      conf.getInt(CHANNEL_WRITABLE_LOW_WATERMARK_KEY, 
CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT);
+    int watermarkHigh =
+      conf.getInt(CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, 
CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT);
+    int fatalThreshold =
+      conf.getInt(CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, 
CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT);
+
+    WriteBufferWaterMark oldWaterMark = writeBufferWaterMark;
+    int oldFatalThreshold = writeBufferFatalThreshold;
+
+    boolean disabled = false;
+    if (watermarkHigh == 0 && watermarkLow == 0) {
+      // if both are 0, use the netty default, which we will treat as 
"disabled".
+      // when disabled, we won't manage autoRead in response to writability 
changes.
+      writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
+      disabled = true;
+    } else {
+      writeBufferWaterMark = new WriteBufferWaterMark(watermarkLow, 
watermarkHigh);
+
+      // only apply this check when watermark is enabled. this way we give the 
operator some
+      // flexibility if they want to try enabling fatal threshold without 
backpressure.
+      if (fatalThreshold > 0 && fatalThreshold <= watermarkHigh) {
+        LOG.warn("Detected a {} value of {}, which is lower than the {} value 
of {}, ignoring.",
+          CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, fatalThreshold, 
CHANNEL_WRITABLE_HIGH_WATERMARK_KEY,
+          watermarkHigh);
+        fatalThreshold = 0;
+      }
+    }
+
+    writeBufferFatalThreshold = fatalThreshold;
+
+    if (
+      oldWaterMark != null && (oldWaterMark.low() != writeBufferWaterMark.low()
+        || oldWaterMark.high() != writeBufferWaterMark.high()
+        || oldFatalThreshold != writeBufferFatalThreshold)
+    ) {
+      LOG.info("Updated netty outbound write buffer watermarks: low={}, 
high={}, fatal={}",
+        disabled ? "disabled" : writeBufferWaterMark.low(),
+        disabled ? "disabled" : writeBufferWaterMark.high(), 
writeBufferFatalThreshold);
+    }
+
+    // update any existing channels
+    if (bootstrap != null) {

Review Comment:
   bootstrap is assigned in constructor, so it will never be null?



##########
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java:
##########
@@ -64,4 +65,6 @@ public interface MetricsHBaseServerWrapper {
   int getActiveScanRpcHandlerCount();
 
   long getNettyDmUsage();
+
+  Pair<Long, Long> getTotalAndMaxNettyOutboundBytes();

Review Comment:
   But better add some comments to describe this? Something like 'these two 
numbers are calcuated together, so we want to return them at once'.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to