This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 004cd012e1 HttpClient: Include error handler on all connection
attempts. (#14915)
004cd012e1 is described below
commit 004cd012e1cf94f8bb948d7c350f8c06df44af72
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Aug 29 01:58:04 2023 -0700
HttpClient: Include error handler on all connection attempts. (#14915)
Currently we have an error handler for https connection attempts, but
not for plaintext connection attempts. This leads to warnings like the
following for plaintext connection errors:
EXCEPTION, please implement
org.jboss.netty.handler.codec.http.HttpContentDecompressor.exceptionCaught()
for proper handling.
This happens because if we don't add our own error handler, the last
handler in the chain during a connection attempt is HttpContentDecompressor,
which doesn't handle errors.
The new error handler for plaintext doesn't do much: it just closes
the channel.
---
.../http/client/pool/ChannelResourceFactory.java | 87 ++++++++++++++++------
.../java/util/http/client/JankyServersTest.java | 72 ++++++++++++++++++
2 files changed, 135 insertions(+), 24 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java
b/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java
index d6465be305..c679894588 100644
---
a/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java
+++
b/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java
@@ -44,6 +44,7 @@ import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.util.Timer;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
@@ -53,13 +54,15 @@ import java.net.URL;
import java.util.concurrent.TimeUnit;
/**
+ *
*/
public class ChannelResourceFactory implements ResourceFactory<String,
ChannelFuture>
{
private static final Logger log = new Logger(ChannelResourceFactory.class);
private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS =
TimeUnit.SECONDS.toMillis(10);
- private static final String DRUID_PROXY_HANDLER = "druid_proxyHandler";
+ private static final String PROXY_HANDLER_NAME = "druid-proxy";
+ private static final String ERROR_HANDLER_NAME = "druid-connection-error";
private final ClientBootstrap bootstrap;
private final SSLContext sslContext;
@@ -128,7 +131,7 @@ public class ChannelResourceFactory implements
ResourceFactory<String, ChannelFu
if (f1.isSuccess()) {
final Channel channel = f1.getChannel();
channel.getPipeline().addLast(
- DRUID_PROXY_HANDLER,
+ PROXY_HANDLER_NAME,
new SimpleChannelUpstreamHandler()
{
@Override
@@ -137,7 +140,7 @@ public class ChannelResourceFactory implements
ResourceFactory<String, ChannelFu
Object msg = e.getMessage();
final ChannelPipeline pipeline = ctx.getPipeline();
- pipeline.remove(DRUID_PROXY_HANDLER);
+ pipeline.remove(PROXY_HANDLER_NAME);
if (msg instanceof HttpResponse) {
HttpResponse httpResponse = (HttpResponse) msg;
@@ -217,27 +220,7 @@ public class ChannelResourceFactory implements
ResourceFactory<String, ChannelFu
sslHandler.setCloseOnSSLException(true);
final ChannelFuture handshakeFuture =
Channels.future(connectFuture.getChannel());
- connectFuture.getChannel().getPipeline().addLast(
- "connectionErrorHandler", new SimpleChannelUpstreamHandler()
- {
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx,
ExceptionEvent e)
- {
- final Channel channel = ctx.getChannel();
- if (channel == null) {
- // For the case where this pipeline is not attached yet.
- handshakeFuture.setFailure(new ChannelException(
- StringUtils.format("Channel is null. The context name is
[%s]", ctx.getName())
- ));
- return;
- }
- handshakeFuture.setFailure(e.getCause());
- if (channel.isOpen()) {
- channel.close();
- }
- }
- }
- );
+ connectFuture.getChannel().getPipeline().addLast(ERROR_HANDLER_NAME, new
ConnectionErrorHandler(handshakeFuture));
connectFuture.addListener(
new ChannelFutureListener()
{
@@ -280,6 +263,7 @@ public class ChannelResourceFactory implements
ResourceFactory<String, ChannelFu
retVal = handshakeFuture;
} else {
+ connectFuture.getChannel().getPipeline().addLast(ERROR_HANDLER_NAME, new
ConnectionErrorHandler(null));
retVal = connectFuture;
}
@@ -308,4 +292,59 @@ public class ChannelResourceFactory implements
ResourceFactory<String, ChannelFu
log.trace("Closing");
resource.awaitUninterruptibly().getChannel().close();
}
+
+ /**
+ * Handler that captures errors that occur while connecting. Typically
superseded by other handlers after
+ * a connection happens, in {@link
org.apache.druid.java.util.http.client.NettyHttpClient}.
+ *
+ * It's important to have this for all channels, even if {@link #future} is
null, because otherwise exceptions
+ * that occur during connection land at {@link
org.jboss.netty.handler.codec.http.HttpContentDecompressor} (the last
+ * handler from {@link
org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory}) and
are dropped on
+ * the floor along with a scary-looking warning like "EXCEPTION, please
implement
+ *
org.jboss.netty.handler.codec.http.HttpContentDecompressor.exceptionCaught()
for proper handling."
+ */
+ private static class ConnectionErrorHandler extends
SimpleChannelUpstreamHandler
+ {
+ @Nullable
+ private final ChannelFuture future;
+
+ /**
+ * Constructor.
+ *
+ * @param future future to attach errors to
+ */
+ public ConnectionErrorHandler(@Nullable ChannelFuture future)
+ {
+ this.future = future;
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final
ExceptionEvent e)
+ {
+ final Channel channel = ctx.getChannel();
+ if (channel == null) {
+ // For the case where this pipeline is not attached yet.
+ if (future != null && !future.isDone()) {
+ final ChannelException e2 =
+ new ChannelException(StringUtils.format("Channel is null. The
context name is [%s]", ctx.getName()));
+ e2.addSuppressed(e.getCause());
+ future.setFailure(e2);
+ }
+ return;
+ }
+
+ if (future != null && !future.isDone()) {
+ future.setFailure(e.getCause());
+ }
+
+ // Close the channel if this is the last handler. Otherwise, we expect
that NettyHttpClient would have added
+ // additional handlers to take care of the errors.
+ //noinspection ObjectEquality
+ if (channel.isOpen() && this == ctx.getPipeline().getLast()) {
+ channel.close();
+ }
+
+ ctx.sendUpstream(e);
+ }
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java
b/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java
index 3d12cf7a1f..ec54bd1350 100644
---
a/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java
@@ -296,6 +296,78 @@ public class JankyServersTest
}
}
+ @Test
+ public void testHttpConnectionRefused() throws Throwable
+ {
+ final Lifecycle lifecycle = new Lifecycle();
+ try {
+ final HttpClientConfig config =
HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build();
+ final HttpClient client = HttpClientInit.createClient(config, lifecycle);
+
+ // Need to select a port that isn't being listened to. This approach
finds an unused port in a racey way.
+ // Hopefully it works most of the time.
+ final ServerSocket sock = new ServerSocket(0);
+ final int port = sock.getLocalPort();
+ sock.close();
+
+ final ListenableFuture<StatusResponseHolder> response = client
+ .go(
+ new Request(HttpMethod.GET, new
URL(StringUtils.format("http://localhost:%d/", port))),
+ StatusResponseHandler.getInstance()
+ );
+
+ Throwable e = null;
+ try {
+ response.get();
+ }
+ catch (ExecutionException e1) {
+ e = e1.getCause();
+ e1.printStackTrace();
+ }
+
+ Assert.assertTrue("ChannelException thrown by 'get'",
isChannelClosedException(e));
+ }
+ finally {
+ lifecycle.stop();
+ }
+ }
+
+ @Test
+ public void testHttpsConnectionRefused() throws Throwable
+ {
+ final Lifecycle lifecycle = new Lifecycle();
+ try {
+ final HttpClientConfig config =
HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build();
+ final HttpClient client = HttpClientInit.createClient(config, lifecycle);
+
+ // Need to select a port that isn't being listened to. This approach
finds an unused port in a racey way.
+ // Hopefully it works most of the time.
+ final ServerSocket sock = new ServerSocket(0);
+ final int port = sock.getLocalPort();
+ sock.close();
+
+ final ListenableFuture<StatusResponseHolder> response = client
+ .go(
+ new Request(HttpMethod.GET, new
URL(StringUtils.format("https://localhost:%d/", port))),
+ StatusResponseHandler.getInstance()
+ );
+
+ Throwable e = null;
+ try {
+ response.get();
+ }
+ catch (ExecutionException e1) {
+ e = e1.getCause();
+ e1.printStackTrace();
+ }
+
+ Assert.assertTrue("ChannelException thrown by 'get'",
isChannelClosedException(e));
+ }
+ finally {
+ lifecycle.stop();
+ }
+ }
+
public boolean isChannelClosedException(Throwable e)
{
return e instanceof ChannelException ||
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]