This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 29ecfc50e23 HBASE-27947 RegionServer OOM when outbound channel backed
up (#5350)
29ecfc50e23 is described below
commit 29ecfc50e23a584ffb86c5fc759eef662a9c0182
Author: Bryan Beaudreault <[email protected]>
AuthorDate: Fri Aug 18 09:59:38 2023 -0400
HBASE-27947 RegionServer OOM when outbound channel backed up (#5350)
Signed-off-by: Duo Zhang <[email protected]>
Reviewed-by: Norman Maurer <[email protected]>
---
.../apache/hadoop/hbase/util/NettyUnsafeUtils.java | 61 +++++++
.../hadoop/hbase/ipc/MetricsHBaseServerSource.java | 16 ++
.../hbase/ipc/MetricsHBaseServerWrapper.java | 7 +
.../hbase/ipc/MetricsHBaseServerSourceImpl.java | 24 +++
.../hadoop/hbase/ipc/MetricsHBaseServer.java | 8 +
.../hbase/ipc/MetricsHBaseServerWrapperImpl.java | 13 ++
.../apache/hadoop/hbase/ipc/NettyRpcServer.java | 160 +++++++++++++++++-
.../NettyRpcServerChannelWritabilityHandler.java | 125 ++++++++++++++
.../hbase/ipc/NettyRpcServerPreambleHandler.java | 11 +-
.../apache/hadoop/hbase/ipc/NettyServerCall.java | 2 +-
.../hadoop/hbase/ipc/FailingNettyRpcServer.java | 9 +-
.../hbase/ipc/MetricsHBaseServerWrapperStub.java | 7 +
.../hbase/ipc/TestNettyChannelWritability.java | 182 +++++++++++++++++++++
.../apache/hadoop/hbase/ipc/TestRpcMetrics.java | 9 +
.../hbase/ipc/TestRpcSkipInitialSaslHandshake.java | 28 +---
15 files changed, 618 insertions(+), 44 deletions(-)
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyUnsafeUtils.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyUnsafeUtils.java
new file mode 100644
index 00000000000..8b246e978ea
--- /dev/null
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyUnsafeUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundBuffer;
+
+/**
+ * Wraps some usages of netty's unsafe API, for ease of maintainability.
+ */
[email protected]
+public final class NettyUnsafeUtils {
+
+ private NettyUnsafeUtils() {
+ }
+
+ /**
+ * Directly closes the channel, setting SO_LINGER to 0 and skipping any
handlers in the pipeline.
+ * This is useful for cases where it's important to immediately close
without any delay.
+ * Otherwise, pipeline handlers and even general TCP flows can cause a
normal close to take
+ * upwards of a few second or more. This will likely cause the client side
to see either a
+ * "Connection reset by peer" or unexpected ConnectionClosedException.
+ * <p>
+ * <b>It's necessary to call this from within the channel's eventLoop!</b>
+ */
+ public static void closeImmediately(Channel channel) {
+ assert channel.eventLoop().inEventLoop();
+ channel.config().setOption(ChannelOption.SO_LINGER, 0);
+ channel.unsafe().close(channel.voidPromise());
+ }
+
+ /**
+ * Get total bytes pending write to socket
+ */
+ public static long getTotalPendingOutboundBytes(Channel channel) {
+ ChannelOutboundBuffer outboundBuffer = channel.unsafe().outboundBuffer();
+ // can be null when the channel is closing
+ if (outboundBuffer == null) {
+ return 0;
+ }
+ return outboundBuffer.totalPendingWriteBytes();
+ }
+}
diff --git
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index 98ecf8b8d92..df2e335a718 100644
---
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -46,6 +46,14 @@ public interface MetricsHBaseServerSource extends
ExceptionTrackingSource {
String PROCESS_CALL_TIME_DESC = "Processing call time.";
String TOTAL_CALL_TIME_NAME = "totalCallTime";
String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and
processing time.";
+
+ String UNWRITABLE_TIME_NAME = "unwritableTime";
+ String UNWRITABLE_TIME_DESC =
+ "Time where an channel was unwritable due to having too many outbound
bytes";
+ String MAX_OUTBOUND_BYTES_EXCEEDED_NAME = "maxOutboundBytesExceeded";
+ String MAX_OUTBOUND_BYTES_EXCEEDED_DESC =
+ "Number of times a connection was closed because the channel outbound "
+ + "bytes exceeded the configured max.";
String QUEUE_SIZE_NAME = "queueSize";
String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has
been read and "
+ "parsed and is waiting to run or is currently being executed.";
@@ -97,6 +105,10 @@ public interface MetricsHBaseServerSource extends
ExceptionTrackingSource {
String NETTY_DM_USAGE_NAME = "nettyDirectMemoryUsage";
String NETTY_DM_USAGE_DESC = "Current Netty direct memory usage.";
+ String NETTY_TOTAL_PENDING_OUTBOUND_NAME = "nettyTotalPendingOutboundBytes";
+ String NETTY_TOTAL_PENDING_OUTBOUND_DESC = "Current total bytes pending
write to all channel";
+ String NETTY_MAX_PENDING_OUTBOUND_NAME = "nettyMaxPendingOutboundBytes";
+ String NETTY_MAX_PENDING_OUTBOUND_DESC = "Current maximum bytes pending
write to any channel";
void authorizationSuccess();
@@ -121,4 +133,8 @@ public interface MetricsHBaseServerSource extends
ExceptionTrackingSource {
void processedCall(int processingTime);
void queuedAndProcessedCall(int totalTime);
+
+ void unwritableTime(long unwritableTime);
+
+ void maxOutboundBytesExceeded();
}
diff --git
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
index 1a8980bbc7b..bb376cba930 100644
---
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
+++
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.ipc;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
@@ -64,4 +65,10 @@ public interface MetricsHBaseServerWrapper {
int getActiveScanRpcHandlerCount();
long getNettyDmUsage();
+
+ /**
+ * These two metrics are calculated together, so we want to return them in
one call
+ * @return pair containing total (first) and max (second) pending outbound
bytes.
+ */
+ Pair<Long, Long> getTotalAndMaxNettyOutboundBytes();
}
diff --git
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index 9c75f4e6bcb..1a6d557d8ad 100644
---
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.metrics.ExceptionTrackingSourceImpl;
import org.apache.hadoop.hbase.metrics.Interns;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
@@ -36,10 +37,12 @@ public class MetricsHBaseServerSourceImpl extends
ExceptionTrackingSourceImpl
private final MutableFastCounter authenticationFallbacks;
private final MutableFastCounter sentBytes;
private final MutableFastCounter receivedBytes;
+ private final MutableFastCounter maxOutboundBytesExceeded;
private MetricHistogram queueCallTime;
private MetricHistogram processCallTime;
private MetricHistogram totalCallTime;
+ private MetricHistogram unwritableTime;
private MetricHistogram requestSize;
private MetricHistogram responseSize;
@@ -67,6 +70,10 @@ public class MetricsHBaseServerSourceImpl extends
ExceptionTrackingSourceImpl
this.getMetricsRegistry().newTimeHistogram(PROCESS_CALL_TIME_NAME,
PROCESS_CALL_TIME_DESC);
this.totalCallTime =
this.getMetricsRegistry().newTimeHistogram(TOTAL_CALL_TIME_NAME,
TOTAL_CALL_TIME_DESC);
+ this.unwritableTime =
+ this.getMetricsRegistry().newTimeHistogram(UNWRITABLE_TIME_NAME,
UNWRITABLE_TIME_DESC);
+ this.maxOutboundBytesExceeded = this.getMetricsRegistry()
+ .newCounter(MAX_OUTBOUND_BYTES_EXCEEDED_NAME,
MAX_OUTBOUND_BYTES_EXCEEDED_DESC, 0);
this.requestSize =
this.getMetricsRegistry().newSizeHistogram(REQUEST_SIZE_NAME,
REQUEST_SIZE_DESC);
this.responseSize =
@@ -133,6 +140,16 @@ public class MetricsHBaseServerSourceImpl extends
ExceptionTrackingSourceImpl
totalCallTime.add(totalTime);
}
+ @Override
+ public void unwritableTime(long unwritableTime) {
+ this.unwritableTime.add(unwritableTime);
+ }
+
+ @Override
+ public void maxOutboundBytesExceeded() {
+ maxOutboundBytesExceeded.incr();
+ }
+
@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName);
@@ -177,6 +194,13 @@ public class MetricsHBaseServerSourceImpl extends
ExceptionTrackingSourceImpl
wrapper.getActiveScanRpcHandlerCount())
.addGauge(Interns.info(NETTY_DM_USAGE_NAME, NETTY_DM_USAGE_DESC),
wrapper.getNettyDmUsage());
+
+ Pair<Long, Long> totalAndMax =
wrapper.getTotalAndMaxNettyOutboundBytes();
+ mrb.addGauge(
+ Interns.info(NETTY_TOTAL_PENDING_OUTBOUND_NAME,
NETTY_TOTAL_PENDING_OUTBOUND_DESC),
+ totalAndMax.getFirst());
+ mrb.addGauge(Interns.info(NETTY_MAX_PENDING_OUTBOUND_NAME,
NETTY_MAX_PENDING_OUTBOUND_DESC),
+ totalAndMax.getSecond());
}
metricsRegistry.snapshot(mrb, all);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
index a4c73f925d3..b5fbb5c43d1 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
@@ -97,6 +97,14 @@ public class MetricsHBaseServer {
source.queuedAndProcessedCall(totalTime);
}
+ void unwritableTime(long unwritableTime) {
+ source.unwritableTime(unwritableTime);
+ }
+
+ void maxOutboundBytesExceeded() {
+ source.maxOutboundBytesExceeded();
+ }
+
public void exception(Throwable throwable) {
source.exception();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
index 857315568c5..1fc1806265d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.util.DirectMemoryUtils;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
@@ -209,4 +210,16 @@ public class MetricsHBaseServerWrapperImpl implements
MetricsHBaseServerWrapper
return DirectMemoryUtils.getNettyDirectMemoryUsage();
}
+
+ @Override
+ public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() {
+ if (
+ !isServerStarted() || this.server.getScheduler() == null
+ || !(this.server instanceof NettyRpcServer)
+ ) {
+ return Pair.newPair(0L, 0L);
+ }
+
+ return ((NettyRpcServer) server).getTotalAndMaxNettyOutboundBytes();
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index bd024c90c3f..e83ad05f170 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
+import org.apache.hadoop.hbase.util.NettyUnsafeUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
@@ -60,6 +61,7 @@ import
org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
+import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark;
import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
import
org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder;
@@ -91,6 +93,38 @@ public class NettyRpcServer extends RpcServer {
static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled";
static final String HEAP_ALLOCATOR_TYPE = "heap";
+ /**
+ * Low watermark for pending outbound bytes of a single netty channel. If
the high watermark was
+ * exceeded, channel will have setAutoRead to true again. The server will
start reading incoming
+ * bytes (requests) from the client channel.
+ */
+ public static final String CHANNEL_WRITABLE_LOW_WATERMARK_KEY =
+ "hbase.server.netty.writable.watermark.low";
+ private static final int CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT = 0;
+
+ /**
+ * High watermark for pending outbound bytes of a single netty channel. If
the number of pending
+ * outbound bytes exceeds this threshold, setAutoRead will be false for the
channel. The server
+ * will stop reading incoming requests from the client channel.
+ * <p>
+ * Note: any requests already in the call queue will still be processed.
+ */
+ public static final String CHANNEL_WRITABLE_HIGH_WATERMARK_KEY =
+ "hbase.server.netty.writable.watermark.high";
+ private static final int CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT = 0;
+
+ /**
+ * Fatal watermark for pending outbound bytes of a single netty channel. If
the number of pending
+ * outbound bytes exceeds this threshold, the connection will be forcibly
closed so that memory
+ * can be reclaimed. The client will have to re-establish a new connection
and retry any in-flight
+ * requests.
+ * <p>
+ * Note: must be higher than the high watermark, otherwise it's ignored.
+ */
+ public static final String CHANNEL_WRITABLE_FATAL_WATERMARK_KEY =
+ "hbase.server.netty.writable.watermark.fatal";
+ private static final int CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT = 0;
+
private final InetSocketAddress bindAddress;
private final CountDownLatch closed = new CountDownLatch(1);
@@ -101,6 +135,9 @@ public class NettyRpcServer extends RpcServer {
private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new
AtomicReference<>();
private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new
AtomicReference<>();
+ private volatile int writeBufferFatalThreshold;
+ private volatile WriteBufferWaterMark writeBufferWaterMark;
+
public NettyRpcServer(Server server, String name,
List<BlockingServiceAndInterface> services,
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
boolean reservoirEnabled) throws IOException {
@@ -115,6 +152,10 @@ public class NettyRpcServer extends RpcServer {
if (config == null) {
config = new NettyEventLoopGroupConfig(conf, "NettyRpcServer");
}
+
+ // call before creating bootstrap below so that the necessary configs can
be set
+ configureNettyWatermarks(conf);
+
EventLoopGroup eventLoopGroup = config.group();
Class<? extends ServerChannel> channelClass = config.serverChannelClass();
ServerBootstrap bootstrap = new
ServerBootstrap().group(eventLoopGroup).channel(channelClass)
@@ -124,6 +165,7 @@ public class NettyRpcServer extends RpcServer {
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
+ ch.config().setWriteBufferWaterMark(writeBufferWaterMark);
ch.config().setAllocator(channelAllocator);
ChannelPipeline pipeline = ch.pipeline();
FixedLengthFrameDecoder preambleDecoder = new
FixedLengthFrameDecoder(6);
@@ -131,12 +173,18 @@ public class NettyRpcServer extends RpcServer {
if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) {
initSSL(pipeline,
conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true));
}
+ NettyServerRpcConnection conn = createNettyServerRpcConnection(ch);
pipeline.addLast(NettyRpcServerPreambleHandler.DECODER_NAME,
preambleDecoder)
- .addLast(createNettyRpcServerPreambleHandler())
+ .addLast(new NettyRpcServerPreambleHandler(NettyRpcServer.this,
conn))
// We need NettyRpcServerResponseEncoder here because
NettyRpcServerPreambleHandler may
// send RpcResponse to client.
- .addLast(NettyRpcServerResponseEncoder.NAME,
- new NettyRpcServerResponseEncoder(metrics));
+ .addLast(NettyRpcServerResponseEncoder.NAME, new
NettyRpcServerResponseEncoder(metrics))
+ // Add writability handler after the response encoder, so we can
abort writes before
+ // they get encoded, if the fatal threshold is exceeded. We pass
in suppliers here so
+ // that the handler configs can be live updated via update_config.
+ .addLast(NettyRpcServerChannelWritabilityHandler.NAME,
+ new NettyRpcServerChannelWritabilityHandler(metrics, () ->
writeBufferFatalThreshold,
+ () -> isWritabilityBackpressureEnabled()));
}
});
try {
@@ -149,6 +197,91 @@ public class NettyRpcServer extends RpcServer {
this.scheduler.init(new RpcSchedulerContext(this));
}
+ @Override
+ public void onConfigurationChange(Configuration newConf) {
+ super.onConfigurationChange(newConf);
+ configureNettyWatermarks(newConf);
+ }
+
+ private void configureNettyWatermarks(Configuration conf) {
+ int watermarkLow =
+ conf.getInt(CHANNEL_WRITABLE_LOW_WATERMARK_KEY,
CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT);
+ int watermarkHigh =
+ conf.getInt(CHANNEL_WRITABLE_HIGH_WATERMARK_KEY,
CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT);
+ int fatalThreshold =
+ conf.getInt(CHANNEL_WRITABLE_FATAL_WATERMARK_KEY,
CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT);
+
+ WriteBufferWaterMark oldWaterMark = writeBufferWaterMark;
+ int oldFatalThreshold = writeBufferFatalThreshold;
+
+ boolean disabled = false;
+ if (watermarkHigh == 0 && watermarkLow == 0) {
+ // if both are 0, use the netty default, which we will treat as
"disabled".
+ // when disabled, we won't manage autoRead in response to writability
changes.
+ writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
+ disabled = true;
+ } else {
+ // netty checks pendingOutboundBytes < watermarkLow. It can never be
less than 0, so set to
+ // 1 to avoid confusing behavior.
+ if (watermarkLow == 0) {
+ LOG.warn(
+ "Detected a {} value of 0, which is impossible to achieve "
+ + "due to how netty evaluates these thresholds, setting to 1",
+ CHANNEL_WRITABLE_LOW_WATERMARK_KEY);
+ watermarkLow = 1;
+ }
+
+ // netty validates the watermarks and throws an exception if high < low,
fail more gracefully
+ // by disabling the watermarks and warning.
+ if (watermarkHigh <= watermarkLow) {
+ LOG.warn(
+ "Detected {} value {}, lower than {} value {}. This will fail netty
validation, "
+ + "so disabling",
+ CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, watermarkHigh,
CHANNEL_WRITABLE_LOW_WATERMARK_KEY,
+ watermarkLow);
+ writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
+ } else {
+ writeBufferWaterMark = new WriteBufferWaterMark(watermarkLow,
watermarkHigh);
+ }
+
+ // only apply this check when watermark is enabled. this way we give the
operator some
+ // flexibility if they want to try enabling fatal threshold without
backpressure.
+ if (fatalThreshold > 0 && fatalThreshold <= watermarkHigh) {
+ LOG.warn("Detected a {} value of {}, which is lower than the {} value
of {}, ignoring.",
+ CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, fatalThreshold,
CHANNEL_WRITABLE_HIGH_WATERMARK_KEY,
+ watermarkHigh);
+ fatalThreshold = 0;
+ }
+ }
+
+ writeBufferFatalThreshold = fatalThreshold;
+
+ if (
+ oldWaterMark != null && (oldWaterMark.low() != writeBufferWaterMark.low()
+ || oldWaterMark.high() != writeBufferWaterMark.high()
+ || oldFatalThreshold != writeBufferFatalThreshold)
+ ) {
+ LOG.info("Updated netty outbound write buffer watermarks: low={},
high={}, fatal={}",
+ disabled ? "disabled" : writeBufferWaterMark.low(),
+ disabled ? "disabled" : writeBufferWaterMark.high(),
+ writeBufferFatalThreshold <= 0 ? "disabled" :
writeBufferFatalThreshold);
+ }
+
+ // update any existing channels
+ for (Channel channel : allChannels) {
+ channel.config().setWriteBufferWaterMark(writeBufferWaterMark);
+ // if disabling watermark, set auto read to true in case channel had
been exceeding
+ // previous watermark
+ if (disabled) {
+ channel.config().setAutoRead(true);
+ }
+ }
+ }
+
+ public boolean isWritabilityBackpressureEnabled() {
+ return writeBufferWaterMark != WriteBufferWaterMark.DEFAULT;
+ }
+
private ByteBufAllocator getChannelAllocator(Configuration conf) throws
IOException {
final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY);
if (value != null) {
@@ -179,10 +312,10 @@ public class NettyRpcServer extends RpcServer {
}
}
- // will be overriden in tests
+ // will be overridden in tests
@InterfaceAudience.Private
- protected NettyRpcServerPreambleHandler
createNettyRpcServerPreambleHandler() {
- return new NettyRpcServerPreambleHandler(NettyRpcServer.this);
+ protected NettyServerRpcConnection createNettyServerRpcConnection(Channel
channel) {
+ return new NettyServerRpcConnection(NettyRpcServer.this, channel);
}
@Override
@@ -320,4 +453,19 @@ public class NettyRpcServer extends RpcServer {
}
return result;
}
+
+ public int getWriteBufferFatalThreshold() {
+ return writeBufferFatalThreshold;
+ }
+
+ public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() {
+ long total = 0;
+ long max = 0;
+ for (Channel channel : allChannels) {
+ long outbound = NettyUnsafeUtils.getTotalPendingOutboundBytes(channel);
+ total += outbound;
+ max = Math.max(max, outbound);
+ }
+ return Pair.newPair(total, max);
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerChannelWritabilityHandler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerChannelWritabilityHandler.java
new file mode 100644
index 00000000000..4b0b3878da8
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerChannelWritabilityHandler.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.function.BooleanSupplier;
+import java.util.function.IntSupplier;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.NettyUnsafeUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
+import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
+
+/**
+ * Handler to enforce writability protections on our server channels: <br>
+ * - Responds to channel writability events, which are triggered when the
total pending bytes for a
+ * channel passes configured high and low watermarks. When high watermark is
exceeded, the channel
+ * is setAutoRead(false). This way, we won't accept new requests from the
client until some pending
+ * outbound bytes are successfully received by the client.<br>
+ * - Pre-processes any channel write requests. If the total pending outbound
bytes exceeds a fatal
+ * threshold, the channel is forcefully closed and the write is set to failed.
This handler should
+ * be the last handler in the pipeline so that it's the first handler to
receive any messages sent
+ * to channel.write() or channel.writeAndFlush().
+ */
[email protected]
+public class NettyRpcServerChannelWritabilityHandler extends
ChannelDuplexHandler {
+
+ static final String NAME = "NettyRpcServerChannelWritabilityHandler";
+
+ private final MetricsHBaseServer metrics;
+ private final IntSupplier pendingBytesFatalThreshold;
+ private final BooleanSupplier isWritabilityBackpressureEnabled;
+
+ private boolean writable = true;
+ private long unwritableStartTime;
+
+ NettyRpcServerChannelWritabilityHandler(MetricsHBaseServer metrics,
+ IntSupplier pendingBytesFatalThreshold, BooleanSupplier
isWritabilityBackpressureEnabled) {
+ this.metrics = metrics;
+ this.pendingBytesFatalThreshold = pendingBytesFatalThreshold;
+ this.isWritabilityBackpressureEnabled = isWritabilityBackpressureEnabled;
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise)
+ throws Exception {
+ if (handleFatalThreshold(ctx)) {
+ promise.setFailure(
+ new ConnectionClosedException("Channel outbound bytes exceeded fatal
threshold"));
+ if (msg instanceof RpcResponse) {
+ ((RpcResponse) msg).done();
+ } else {
+ ReferenceCountUtil.release(msg);
+ }
+ return;
+ }
+ ctx.write(msg, promise);
+ }
+
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws
Exception {
+ if (isWritabilityBackpressureEnabled.getAsBoolean()) {
+ handleWritabilityChanged(ctx);
+ }
+ ctx.fireChannelWritabilityChanged();
+ }
+
+ private boolean handleFatalThreshold(ChannelHandlerContext ctx) {
+ int fatalThreshold = pendingBytesFatalThreshold.getAsInt();
+ if (fatalThreshold <= 0) {
+ return false;
+ }
+
+ Channel channel = ctx.channel();
+ long outboundBytes =
NettyUnsafeUtils.getTotalPendingOutboundBytes(channel);
+ if (outboundBytes < fatalThreshold) {
+ return false;
+ }
+
+ if (channel.isOpen()) {
+ metrics.maxOutboundBytesExceeded();
+ RpcServer.LOG.warn(
+ "{}: Closing connection because outbound buffer size of {} exceeds
fatal threshold of {}",
+ channel.remoteAddress(), outboundBytes, fatalThreshold);
+ NettyUnsafeUtils.closeImmediately(channel);
+ }
+
+ return true;
+ }
+
+ private void handleWritabilityChanged(ChannelHandlerContext ctx) {
+ boolean oldWritableValue = this.writable;
+
+ this.writable = ctx.channel().isWritable();
+ ctx.channel().config().setAutoRead(this.writable);
+
+ if (!oldWritableValue && this.writable) {
+ // changing from not writable to writable, update metrics
+ metrics.unwritableTime(EnvironmentEdgeManager.currentTime() -
unwritableStartTime);
+ unwritableStartTime = 0;
+ } else if (oldWritableValue && !this.writable) {
+ // changing from writable to non-writable, set start time
+ unwritableStartTime = EnvironmentEdgeManager.currentTime();
+ }
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
index 8269bbc60d8..b79a67f986e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.util.NettyFutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import
org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
@@ -38,14 +37,15 @@ class NettyRpcServerPreambleHandler extends
SimpleChannelInboundHandler<ByteBuf>
static final String DECODER_NAME = "preambleDecoder";
private final NettyRpcServer rpcServer;
+ private final NettyServerRpcConnection conn;
- public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer) {
+ public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer,
NettyServerRpcConnection conn) {
this.rpcServer = rpcServer;
+ this.conn = conn;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws
Exception {
- NettyServerRpcConnection conn =
createNettyServerRpcConnection(ctx.channel());
ByteBuffer buf = ByteBuffer.allocate(msg.readableBytes());
msg.readBytes(buf);
buf.flip();
@@ -76,9 +76,4 @@ class NettyRpcServerPreambleHandler extends
SimpleChannelInboundHandler<ByteBuf>
ctx.channel().remoteAddress(), cause);
NettyFutureUtils.safeClose(ctx);
}
-
- // will be overridden in tests
- protected NettyServerRpcConnection createNettyServerRpcConnection(Channel
channel) {
- return new NettyServerRpcConnection(rpcServer, channel);
- }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
index fd0c6d75d88..4f0540da80a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
@@ -54,6 +54,6 @@ class NettyServerCall extends
ServerCall<NettyServerRpcConnection> {
public synchronized void sendResponseIfReady() throws IOException {
// set param null to reduce memory pressure
this.param = null;
- connection.channel.writeAndFlush(this);
+ connection.doRespond(this);
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java
index d5c408c2387..da4f70e3a24 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java
@@ -49,12 +49,7 @@ public class FailingNettyRpcServer extends NettyRpcServer {
}
@Override
- protected NettyRpcServerPreambleHandler
createNettyRpcServerPreambleHandler() {
- return new NettyRpcServerPreambleHandler(FailingNettyRpcServer.this) {
- @Override
- protected NettyServerRpcConnection
createNettyServerRpcConnection(Channel channel) {
- return new FailingConnection(FailingNettyRpcServer.this, channel);
- }
- };
+ protected NettyServerRpcConnection createNettyServerRpcConnection(Channel
channel) {
+ return new FailingConnection(FailingNettyRpcServer.this, channel);
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
index 6e5dfe87fc7..7170413bee9 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.ipc;
+import org.apache.hadoop.hbase.util.Pair;
+
public class MetricsHBaseServerWrapperStub implements
MetricsHBaseServerWrapper {
@Override
public long getTotalQueueSize() {
@@ -127,4 +129,9 @@ public class MetricsHBaseServerWrapperStub implements
MetricsHBaseServerWrapper
public int getActiveMetaPriorityRpcHandlerCount() {
return 1;
}
+
+ @Override
+ public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() {
+ return Pair.newPair(100L, 5L);
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyChannelWritability.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyChannelWritability.java
new file mode 100644
index 00000000000..001f6dbd22c
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyChannelWritability.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static
org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompatibilityFactory;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.test.MetricsAssertHelper;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
+import
org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
+
+@Category({ RPCTests.class, MediumTests.class })
+public class TestNettyChannelWritability {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestNettyChannelWritability.class);
+
+ private static final MetricsAssertHelper METRICS_ASSERT =
+ CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+
+ private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
+ private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES,
CELL_BYTES, CELL_BYTES);
+
+ /**
+ * Test that we properly send configured watermarks to netty, and trigger
setWritable when
+ * necessary.
+ */
+ @Test
+ public void testNettyWritableWatermarks() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_LOW_WATERMARK_KEY, 1);
+ conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, 2);
+
+ NettyRpcServer rpcServer = createRpcServer(conf, 0);
+ try {
+ sendAndReceive(conf, rpcServer, 5);
+ METRICS_ASSERT.assertCounterGt("unwritableTime_numOps", 0,
+ rpcServer.metrics.getMetricsSource());
+ } finally {
+ rpcServer.stop();
+ }
+ }
+
+ /**
+ * Test that our fatal watermark is honored, which requires artificially
causing some queueing so
+ * that pendingOutboundBytes increases.
+ */
+ @Test
+ public void testNettyWritableFatalThreshold() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, 1);
+
+ // flushAfter is 3 here, with requestCount 5 below. If we never flush, the
WriteTasks will sit
+ // in the eventloop. So we flush a few at once, which will ensure that we
hit fatal threshold
+ NettyRpcServer rpcServer = createRpcServer(conf, 3);
+ try {
+ CompletionException exception =
+ assertThrows(CompletionException.class, () -> sendAndReceive(conf,
rpcServer, 5));
+ assertTrue(exception.getCause().getCause() instanceof ServiceException);
+ METRICS_ASSERT.assertCounterGt("maxOutboundBytesExceeded", 0,
+ rpcServer.metrics.getMetricsSource());
+ } finally {
+ rpcServer.stop();
+ }
+ }
+
+ private void sendAndReceive(Configuration conf, NettyRpcServer rpcServer,
int requestCount)
+ throws Exception {
+ List<Cell> cells = new ArrayList<>();
+ int count = 3;
+ for (int i = 0; i < count; i++) {
+ cells.add(CELL);
+ }
+
+ try (NettyRpcClient client = new NettyRpcClient(conf)) {
+ rpcServer.start();
+ TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+ newBlockingStub(client, rpcServer.getListenerAddress());
+ CompletableFuture<Void>[] futures = new CompletableFuture[requestCount];
+ for (int i = 0; i < requestCount; i++) {
+ futures[i] = CompletableFuture.runAsync(() -> {
+ try {
+ sendMessage(cells, stub);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ CompletableFuture.allOf(futures).join();
+ }
+ }
+
+ private void sendMessage(List<Cell> cells,
+ TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) throws
Exception {
+ HBaseRpcController pcrc = new
HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
+ String message = "hello";
+ assertEquals(message,
+ stub.echo(pcrc,
TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
+ .getMessage());
+ int index = 0;
+ CellScanner cellScanner = pcrc.cellScanner();
+ assertNotNull(cellScanner);
+ while (cellScanner.advance()) {
+ assertEquals(CELL, cellScanner.current());
+ index++;
+ }
+ assertEquals(cells.size(), index);
+ }
+
+ private NettyRpcServer createRpcServer(Configuration conf, int flushAfter)
throws IOException {
+ String name = "testRpcServer";
+ ArrayList<RpcServer.BlockingServiceAndInterface> services =
+ Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE,
null));
+
+ InetSocketAddress bindAddress = new InetSocketAddress("localhost", 0);
+ FifoRpcScheduler scheduler = new FifoRpcScheduler(conf, 1);
+
+ AtomicInteger writeCount = new AtomicInteger(0);
+
+ return new NettyRpcServer(null, name, services, bindAddress, conf,
scheduler, true) {
+ @Override
+ protected NettyServerRpcConnection
createNettyServerRpcConnection(Channel channel) {
+ return new NettyServerRpcConnection(this, channel) {
+ @Override
+ protected void doRespond(RpcResponse resp) {
+ if (writeCount.incrementAndGet() >= flushAfter) {
+ super.doRespond(resp);
+ } else {
+ channel.write(resp);
+ }
+ }
+ };
+ }
+ };
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java
index 288bb3fe262..c55568d392a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java
@@ -89,6 +89,9 @@ public class TestRpcMetrics {
HELPER.assertGauge("numCallsInWriteQueue", 50, serverSource);
HELPER.assertGauge("numCallsInReadQueue", 50, serverSource);
HELPER.assertGauge("numCallsInScanQueue", 2, serverSource);
+ HELPER.assertGauge("nettyDirectMemoryUsage", 100, serverSource);
+ HELPER.assertGauge("nettyTotalPendingOutboundBytes", 100, serverSource);
+ HELPER.assertGauge("nettyMaxPendingOutboundBytes", 5, serverSource);
}
/**
@@ -100,6 +103,12 @@ public class TestRpcMetrics {
new MetricsHBaseServer("HMaster", new MetricsHBaseServerWrapperStub());
MetricsHBaseServerSource serverSource = mrpc.getMetricsSource();
+ mrpc.unwritableTime(100);
+ mrpc.maxOutboundBytesExceeded();
+ mrpc.maxOutboundBytesExceeded();
+ HELPER.assertCounter("maxOutboundBytesExceeded", 2, serverSource);
+ HELPER.assertCounter("unwritableTime_NumOps", 1, serverSource);
+
for (int i = 0; i < 12; i++) {
mrpc.authenticationFailure();
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
index c47cceeb76a..0506f12666a 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
@@ -28,7 +28,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -49,9 +49,7 @@ import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
-import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import
org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
@@ -131,29 +129,15 @@ public class TestRpcSkipInitialSaslHandshake {
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
- final AtomicBoolean useSaslRef = new AtomicBoolean(false);
+ final AtomicReference<NettyServerRpcConnection> conn = new
AtomicReference<>(null);
NettyRpcServer rpcServer = new NettyRpcServer(null,
getClass().getSimpleName(),
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE,
null)),
new InetSocketAddress(HOST, 0), serverConf, new
FifoRpcScheduler(serverConf, 1), true) {
@Override
- protected NettyRpcServerPreambleHandler
createNettyRpcServerPreambleHandler() {
- return new NettyRpcServerPreambleHandler(this) {
- private NettyServerRpcConnection conn;
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
throws Exception {
- super.channelRead0(ctx, msg);
- useSaslRef.set(conn.useSasl);
-
- }
-
- @Override
- protected NettyServerRpcConnection
createNettyServerRpcConnection(Channel channel) {
- conn = super.createNettyServerRpcConnection(channel);
- return conn;
- }
- };
+ protected NettyServerRpcConnection
createNettyServerRpcConnection(Channel channel) {
+ conn.set(super.createNettyServerRpcConnection(channel));
+ return conn.get();
}
};
@@ -167,7 +151,7 @@ public class TestRpcSkipInitialSaslHandshake {
stub.echo(null,
TestProtos.EchoRequestProto.newBuilder().setMessage("test").build())
.getMessage();
assertTrue("test".equals(response));
- assertFalse(useSaslRef.get());
+ assertFalse(conn.get().useSasl);
} finally {
rpcServer.stop();