anmolnar commented on code in PR #4125:
URL: https://github.com/apache/hbase/pull/4125#discussion_r900298904
##########
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java:
##########
@@ -342,14 +366,74 @@ public void operationComplete(ChannelFuture future)
throws Exception {
});
}
+ private void writeAndFlushToChannel(Call call, Channel ch) {
+ if (ch == null) {
+ return;
+ }
+
+ scheduleTimeoutTask(call);
+ ch.writeAndFlush(call).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ // Fail the call if we failed to write it out. This usually because
the channel is
+ // closed. This is needed because we may shutdown the channel inside
event loop and
+ // there may still be some pending calls in the event loop queue after
us.
+ if (!future.isSuccess()) {
+ call.setException(toIOE(future.cause()));
+ }
+ }
+ });
+ }
+
@Override
public void sendRequest(final Call call, HBaseRpcController hrc) {
- execute(eventLoop, () -> {
- try {
- sendRequest0(call, hrc);
- } catch (Exception e) {
- call.setException(toIOE(e));
+ try {
+ sendRequest0(call, hrc);
+ } catch (Exception e) {
+ call.setException(toIOE(e));
+ }
+ }
+
+ /**
+ * HBaseClientPipelineFactory is the netty pipeline factory for this netty
connection
+ * implementation.
+ */
+ private static class HBaseClientPipelineFactory extends
ChannelInitializer<SocketChannel> {
+
+ private SSLContext sslContext = null;
+ private SSLEngine sslEngine = null;
+ private final String host;
+ private final int port;
+ private final Configuration conf;
+
+ public HBaseClientPipelineFactory(String host, int port, Configuration
conf) {
+ this.host = host;
+ this.port = port;
+ this.conf = conf;
+ }
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws
X509Exception.SSLContextException {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (conf.getBoolean(HBASE_NETTY_RPCSERVER_TLS_ENABLED, false)) {
+ initSSL(pipeline);
}
- });
+ pipeline.addLast("handler", new BufferCallBeforeInitHandler());
+ }
+
+ // The synchronized is to prevent the race on shared variable "sslEngine".
+ // Basically we only need to create it once.
+ private synchronized void initSSL(ChannelPipeline pipeline)
+ throws X509Exception.SSLContextException {
+ if (sslContext == null || sslEngine == null) {
+ X509Util x509Util = new X509Util(conf);
+ sslContext = x509Util.createSSLContextAndOptions().getSSLContext();
+ sslEngine = sslContext.createSSLEngine(host, port);
+ sslEngine.setUseClientMode(true);
+ LOG.debug("SSL engine initialized");
+ }
+ pipeline.addLast("ssl", new SslHandler(sslEngine));
Review Comment:
Sure, that's a good suggestion. I'll a add a new config item.
##########
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java:
##########
@@ -342,14 +366,74 @@ public void operationComplete(ChannelFuture future)
throws Exception {
});
}
+ private void writeAndFlushToChannel(Call call, Channel ch) {
+ if (ch == null) {
+ return;
+ }
+
+ scheduleTimeoutTask(call);
+ ch.writeAndFlush(call).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ // Fail the call if we failed to write it out. This usually because
the channel is
+ // closed. This is needed because we may shutdown the channel inside
event loop and
+ // there may still be some pending calls in the event loop queue after
us.
+ if (!future.isSuccess()) {
+ call.setException(toIOE(future.cause()));
+ }
+ }
+ });
+ }
+
@Override
public void sendRequest(final Call call, HBaseRpcController hrc) {
- execute(eventLoop, () -> {
- try {
- sendRequest0(call, hrc);
- } catch (Exception e) {
- call.setException(toIOE(e));
+ try {
+ sendRequest0(call, hrc);
+ } catch (Exception e) {
+ call.setException(toIOE(e));
+ }
+ }
+
+ /**
+ * HBaseClientPipelineFactory is the netty pipeline factory for this netty
connection
+ * implementation.
+ */
+ private static class HBaseClientPipelineFactory extends
ChannelInitializer<SocketChannel> {
+
+ private SSLContext sslContext = null;
+ private SSLEngine sslEngine = null;
+ private final String host;
+ private final int port;
+ private final Configuration conf;
+
+ public HBaseClientPipelineFactory(String host, int port, Configuration
conf) {
+ this.host = host;
+ this.port = port;
+ this.conf = conf;
+ }
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws
X509Exception.SSLContextException {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (conf.getBoolean(HBASE_NETTY_RPCSERVER_TLS_ENABLED, false)) {
+ initSSL(pipeline);
}
- });
+ pipeline.addLast("handler", new BufferCallBeforeInitHandler());
+ }
+
+ // The synchronized is to prevent the race on shared variable "sslEngine".
+ // Basically we only need to create it once.
+ private synchronized void initSSL(ChannelPipeline pipeline)
+ throws X509Exception.SSLContextException {
+ if (sslContext == null || sslEngine == null) {
+ X509Util x509Util = new X509Util(conf);
+ sslContext = x509Util.createSSLContextAndOptions().getSSLContext();
+ sslEngine = sslContext.createSSLEngine(host, port);
+ sslEngine.setUseClientMode(true);
+ LOG.debug("SSL engine initialized");
+ }
+ pipeline.addLast("ssl", new SslHandler(sslEngine));
Review Comment:
Sure, that's a good suggestion. I'll add a new config item.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]