Repository: flume Updated Branches: refs/heads/trunk 964bcf56a -> c570a51b3
Fix HttpSink bad response handling After a bad response, connection.getInputStream() returns null. This patch adds a check for this. This closes #139 Reviewers: Bessenyei Balázs Donát (filippovmn via Bessenyei Balázs Donát) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c570a51b Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c570a51b Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c570a51b Branch: refs/heads/trunk Commit: c570a51b3c53e4899d16dd623e19a0d939518dd2 Parents: 964bcf5 Author: filippovmn <[email protected]> Authored: Wed Jul 12 18:51:26 2017 +0000 Committer: Bessenyei Balázs Donát <[email protected]> Committed: Wed Jul 12 18:55:20 2017 +0000 ---------------------------------------------------------------------- .../org/apache/flume/sink/http/HttpSink.java | 7 ++++- .../apache/flume/sink/http/TestHttpSink.java | 17 +++++++++++ .../apache/flume/sink/http/TestHttpSinkIT.java | 32 ++++++++++++++++++++ 3 files changed, 55 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/c570a51b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java index 9637326..b9c42ed 100644 --- a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java +++ b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java @@ -218,7 +218,12 @@ public class HttpSink extends AbstractSink implements Configurable { int httpStatusCode = connection.getResponseCode(); LOG.debug("Got status code : " + httpStatusCode); - connection.getInputStream().close(); + if (httpStatusCode < HttpURLConnection.HTTP_BAD_REQUEST) { + connection.getInputStream().close(); + } else { + LOG.debug("bad request"); + connection.getErrorStream().close(); + } LOG.debug("Response processed and closed"); if (httpStatusCode >= HTTP_STATUS_CONTINUE) { http://git-wip-us.apache.org/repos/asf/flume/blob/c570a51b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java index 16cb6e8..bee089c 100644 --- a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java +++ b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java @@ -216,6 +216,22 @@ public class TestHttpSink { } @Test + public void ensureSingleErrorStatusConfigurationCorrectlyUsed() throws Exception { + when(channel.take()).thenReturn(event); + when(event.getBody()).thenReturn("something".getBytes()); + + Context context = new Context(); + context.put("defaultRollback", "true"); + context.put("defaultBackoff", "true"); + context.put("defaultIncrementMetrics", "false"); + context.put("rollback.401", "false"); + context.put("backoff.401", "false"); + context.put("incrementMetrics.401", "false"); + + executeWithMocks(true, Status.READY, false, true, context, HttpURLConnection.HTTP_UNAUTHORIZED); + } + + @Test public void ensureGroupConfigurationCorrectlyUsed() throws Exception { when(channel.take()).thenReturn(event); when(event.getBody()).thenReturn("something".getBytes()); @@ -278,6 +294,7 @@ public class TestHttpSink { when(channel.getTransaction()).thenReturn(transaction); when(httpURLConnection.getOutputStream()).thenReturn(outputStream); when(httpURLConnection.getInputStream()).thenReturn(inputStream); + when(httpURLConnection.getErrorStream()).thenReturn(inputStream); when(httpURLConnection.getResponseCode()).thenReturn(httpStatus); Status actualStatus = httpSink.process(); http://git-wip-us.apache.org/repos/asf/flume/blob/c570a51b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java index 74dcf1d..f4fde57 100644 --- a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java +++ b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java @@ -71,6 +71,8 @@ public class TestHttpSinkIT { httpSinkContext.put("contentTypeHeader", "application/json"); httpSinkContext.put("backoff.200", "false"); httpSinkContext.put("rollback.200", "false"); + httpSinkContext.put("backoff.401", "false"); + httpSinkContext.put("rollback.401", "false"); httpSinkContext.put("incrementMetrics.200", "true"); Context memoryChannelContext = new Context(); @@ -132,6 +134,36 @@ public class TestHttpSinkIT { } @Test + public void ensureEventsNotResentOn401Failure() throws Exception { + String errorScenario = "Error skip scenario"; + + service.stubFor(post(urlEqualTo("/endpoint")) + .inScenario(errorScenario) + .whenScenarioStateIs(STARTED) + .withRequestBody(equalToJson(event("UNAUTHORIZED REQUEST"))) + .willReturn(aResponse().withStatus(401) + .withHeader("Content-Type", "text/plain") + .withBody("Not allowed!")) + .willSetStateTo("Error Sent")); + + service.stubFor(post(urlEqualTo("/endpoint")) + .inScenario(errorScenario) + .whenScenarioStateIs("Error Sent") + .withRequestBody(equalToJson(event("NEXT EVENT"))) + .willReturn(aResponse().withStatus(200))); + + addEventToChannel(event("UNAUTHORIZED REQUEST"), Status.READY); + addEventToChannel(event("NEXT EVENT"), Status.READY); + + service.verify(1, postRequestedFor(urlEqualTo("/endpoint")) + .withRequestBody(equalToJson(event("UNAUTHORIZED REQUEST")))); + + service.verify(1, postRequestedFor(urlEqualTo("/endpoint")) + .withRequestBody(equalToJson(event("NEXT EVENT")))); + + } + + @Test public void ensureEventsResentOnNetworkFailure() throws Exception { String errorScenario = "Error Scenario";
