This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new bd95b42 Fix missing exception handling as part of
`io.druid.java.util.http.client.netty.HttpClientPipelineFactory` (#6090)
bd95b42 is described below
commit bd95b426c9171d4f27b06197a2a3a1cd21ffaac0
Author: Benedict Jin <[email protected]>
AuthorDate: Sat Aug 11 08:02:53 2018 +0800
Fix missing exception handling as part of
`io.druid.java.util.http.client.netty.HttpClientPipelineFactory` (#6090)
* Fix missing exception handling as part of
`io.druid.java.util.http.client.netty.HttpClientPipelineFactory`
* 1. Extends SimpleChannelUpstreamHandler; 2. Remove sendUpstream; 3. Using
ExpectedException.
* Add more checks for channel
* Fix missing exception handler in NettyHttpClient and
ChannelResourceFactory
* Rename the anonymous class of `SimpleChannelUpstreamHandler` as
connectionErrorHandler & use `addLast` instead of `addFirst`
* Remove `removeHandlers()`
* Using expectedException.expect instead of Assert.assertNotNull in
testHttpsEchoServer
* Using handshakeFuture.setFailure instead of logger
* Using handshakeFuture.setFailure instead of logger
---
.../java/util/http/client/NettyHttpClient.java | 10 ++++----
.../http/client/pool/ChannelResourceFactory.java | 22 +++++++++++++++++
.../java/util/http/client/JankyServersTest.java | 28 +++++++++-------------
3 files changed, 37 insertions(+), 23 deletions(-)
diff --git
a/java-util/src/main/java/io/druid/java/util/http/client/NettyHttpClient.java
b/java-util/src/main/java/io/druid/java/util/http/client/NettyHttpClient.java
index 7058f3f..c436cb5 100644
---
a/java-util/src/main/java/io/druid/java/util/http/client/NettyHttpClient.java
+++
b/java-util/src/main/java/io/druid/java/util/http/client/NettyHttpClient.java
@@ -278,18 +278,17 @@ public class NettyHttpClient extends AbstractHttpClient
if (response != null) {
handler.exceptionCaught(response, event.getCause());
}
- removeHandlers();
try {
- channel.close();
+ if (channel.isOpen()) {
+ channel.close();
+ }
}
catch (Exception e) {
- // ignore
+ log.warn(e, "Error while closing channel");
}
finally {
channelResourceContainer.returnResource();
}
-
- context.sendUpstream(event);
}
@Override
@@ -308,7 +307,6 @@ public class NettyHttpClient extends AbstractHttpClient
log.warn("[%s] Channel disconnected before response complete",
requestDesc);
retVal.setException(new ChannelException("Channel
disconnected"));
}
- context.sendUpstream(event);
}
private void removeHandlers()
diff --git
a/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
b/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
index 0c8c39d..04124b2 100644
---
a/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
+++
b/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
@@ -27,8 +27,11 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.util.Timer;
@@ -111,6 +114,25 @@ public class ChannelResourceFactory implements
ResourceFactory<String, ChannelFu
pipeline.addFirst("ssl", sslHandler);
final ChannelFuture handshakeFuture =
Channels.future(connectFuture.getChannel());
+ pipeline.addLast("connectionErrorHandler", new
SimpleChannelUpstreamHandler()
+ {
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent
e)
+ {
+ final Channel channel = ctx.getChannel();
+ if (channel == null) {
+ // For the case where this pipeline is not attached yet.
+ handshakeFuture.setFailure(new ChannelException(
+ StringUtils.format("Channel is null. The context name is
[%s]", ctx.getName())
+ ));
+ return;
+ }
+ handshakeFuture.setFailure(e.getCause());
+ if (channel.isOpen()) {
+ channel.close();
+ }
+ }
+ });
connectFuture.addListener(
new ChannelFutureListener()
{
diff --git
a/java-util/src/test/java/io/druid/java/util/http/client/JankyServersTest.java
b/java-util/src/test/java/io/druid/java/util/http/client/JankyServersTest.java
index bdc8a2a..0700e85 100644
---
a/java-util/src/test/java/io/druid/java/util/http/client/JankyServersTest.java
+++
b/java-util/src/test/java/io/druid/java/util/http/client/JankyServersTest.java
@@ -31,7 +31,9 @@ import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import javax.net.ssl.SSLContext;
import java.io.IOException;
@@ -55,6 +57,9 @@ public class JankyServersTest
static ServerSocket echoServerSocket;
static ServerSocket closingServerSocket;
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
@BeforeClass
public static void setUp() throws Exception
{
@@ -309,16 +314,10 @@ public class JankyServersTest
new StatusResponseHandler(StandardCharsets.UTF_8)
);
- Throwable e = null;
- try {
- response.get();
- }
- catch (ExecutionException e1) {
- e = e1.getCause();
- }
+ expectedException.expect(ExecutionException.class);
+ expectedException.expectMessage("java.lang.IllegalArgumentException:
invalid version format: GET");
- Assert.assertTrue("IllegalArgumentException thrown by 'get'", e
instanceof IllegalArgumentException);
- Assert.assertTrue("Expected error message",
e.getMessage().matches(".*invalid version format:.*"));
+ response.get();
}
finally {
lifecycle.stop();
@@ -339,15 +338,10 @@ public class JankyServersTest
new StatusResponseHandler(StandardCharsets.UTF_8)
);
- Throwable e = null;
- try {
- response.get();
- }
- catch (ExecutionException e1) {
- e = e1.getCause();
- }
+ expectedException.expect(ExecutionException.class);
+
expectedException.expectMessage("org.jboss.netty.channel.ChannelException:
Faulty channel in resource pool");
- Assert.assertNotNull("ChannelException thrown by 'get'", e);
+ response.get();
}
finally {
lifecycle.stop();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]