Repository: aurora Updated Branches: refs/heads/master 4e7cdc422 -> 0f1e68401
Convert Webhook to AbstractIdleService, use async HTTP client Hijacking https://reviews.apache.org/r/59703 >From the above review: "Current code uses a synchronous HTTP client, which can >block the EventBus. Switch to an async HTTP client." Previously, we had an issue where the HTTP client would have a non-daemon thread which caused the Scheduler to fail to shutdown. I converted it into an AbstractIdleService and properly closed the client in the shutdown() method. Additionally, I made a small tweak to the original code where we ABORT any response receieved after the status since we don't care. We just use the response code for stats. Testing Done: ./gradlew test Tested proper shutdown occurs in Vagrant. Scale tested up to 2000 TASK_LOST events with the registered endpoint waiting 5-10 minutes to response -- does not seem to block scheduling. Bugs closed: AURORA-1773 Reviewed at https://reviews.apache.org/r/62700/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/0f1e6840 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/0f1e6840 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/0f1e6840 Branch: refs/heads/master Commit: 0f1e68401da64a9aa2fd507010d21b7d7d8ebf53 Parents: 4e7cdc4 Author: Jordan Ly <[email protected]> Authored: Wed Oct 4 13:07:19 2017 -0700 Committer: Santhosh Kumar <[email protected]> Committed: Wed Oct 4 13:07:19 2017 -0700 ---------------------------------------------------------------------- build.gradle | 10 +- .../apache/aurora/scheduler/events/Webhook.java | 117 ++++-- .../aurora/scheduler/events/WebhookInfo.java | 14 + .../aurora/scheduler/events/WebhookModule.java | 36 +- .../org/apache/aurora/scheduler/webhook.json | 4 +- .../aurora/scheduler/events/WebhookTest.java | 413 ++++++++++++------- 6 files changed, 371 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 069c62e..d7a4287 100644 --- a/build.gradle +++ b/build.gradle @@ -82,14 +82,14 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169 ext.gsonRev = '2.3.1' ext.guavaRev = '20.0' ext.guiceRev = '3.0' - ext.httpclientRev = '4.5.2' - ext.httpcoreRev = '4.4.4' + ext.asyncHttpclientRev = '2.0.37' ext.jacksonRev = '2.5.1' ext.jerseyRev = '1.19' ext.jsrRev = '3.0.1' ext.junitRev = '4.12' ext.logbackRev = '1.2.3' ext.mybatisRev = '3.4.1' + ext.nettyRev = '4.0.52.Final' ext.protobufRev = '2.6.1' ext.servletRev = '3.1.0' ext.slf4jRev = '1.7.25' @@ -110,13 +110,12 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169 // See http://forums.gradle.org/gradle/topics/shouldnt-resolutionstrategy-affect-depending-projects-transitive-dependencies resolutionStrategy { failOnVersionConflict() - force "org.apache.httpcomponents:httpclient:${httpclientRev}" - force "org.apache.httpcomponents:httpcore:${httpcoreRev}" force "com.fasterxml.jackson.core:jackson-annotations:${jacksonRev}" force "com.fasterxml.jackson.core:jackson-core:${jacksonRev}" force "com.google.code.gson:gson:${gsonRev}" force "com.google.guava:guava:${guavaRev}" force "com.google.protobuf:protobuf-java:${protobufRev}" + force "io.netty:netty-handler:${nettyRev}" force "junit:junit:${junitRev}" force "org.apache.thrift:libthrift:${thriftRev}" force "org.apache.zookeeper:zookeeper:${zookeeperRev}" @@ -415,8 +414,7 @@ dependencies { compile "org.apache.curator:curator-framework:${curatorRev}" compile "org.apache.curator:curator-recipes:${curatorRev}" compile 'org.apache.mesos:mesos:1.2.0' - compile "org.apache.httpcomponents:httpclient:${httpclientRev}" - compile "org.apache.httpcomponents:httpcore:${httpcoreRev}" + compile "org.asynchttpclient:async-http-client:${asyncHttpclientRev}" compile "org.apache.shiro:shiro-guice:${shiroRev}" compile "org.apache.shiro:shiro-web:${shiroRev}" compile "org.apache.zookeeper:zookeeper:${zookeeperRev}" http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/main/java/org/apache/aurora/scheduler/events/Webhook.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/events/Webhook.java b/src/main/java/org/apache/aurora/scheduler/events/Webhook.java index 05f46a1..2af8118 100644 --- a/src/main/java/org/apache/aurora/scheduler/events/Webhook.java +++ b/src/main/java/org/apache/aurora/scheduler/events/Webhook.java @@ -13,59 +13,73 @@ */ package org.apache.aurora.scheduler.events; -import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.time.Instant; +import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.eventbus.Subscribe; +import com.google.common.util.concurrent.AbstractIdleService; import com.google.inject.Inject; +import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.util.EntityUtils; +import org.asynchttpclient.AsyncCompletionHandler; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.BoundRequestBuilder; +import org.asynchttpclient.HttpResponseStatus; +import org.asynchttpclient.Response; +import org.asynchttpclient.util.HttpConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.Objects.requireNonNull; + /** * Watches TaskStateChanges and send events to configured endpoint. */ -public class Webhook implements EventSubscriber { +public class Webhook extends AbstractIdleService implements EventSubscriber { + @VisibleForTesting + static final String ATTEMPTS_STAT_NAME = "webhooks_attempts"; + @VisibleForTesting + static final String SUCCESS_STAT_NAME = "webhooks_success"; + @VisibleForTesting + static final String ERRORS_STAT_NAME = "webhooks_errors"; + @VisibleForTesting + static final String USER_ERRORS_STAT_NAME = "webhooks_user_errors"; private static final Logger LOG = LoggerFactory.getLogger(Webhook.class); private final WebhookInfo webhookInfo; - private final CloseableHttpClient httpClient; + private final AsyncHttpClient httpClient; private final Predicate<ScheduleStatus> isWhitelisted; + private final AtomicLong attemptsCounter; + private final AtomicLong successCounter; + private final AtomicLong errorsCounter; + private final AtomicLong userErrorsCounter; + @Inject - Webhook(CloseableHttpClient httpClient, WebhookInfo webhookInfo) { - this.webhookInfo = webhookInfo; - this.httpClient = httpClient; - // A task status is whitelisted if: a) the whitelist is absent, or b) the task status is - // explicitly specified in the whitelist. + Webhook(AsyncHttpClient httpClient, WebhookInfo webhookInfo, StatsProvider statsProvider) { + this.webhookInfo = requireNonNull(webhookInfo); + this.httpClient = requireNonNull(httpClient); + this.attemptsCounter = statsProvider.makeCounter(ATTEMPTS_STAT_NAME); + this.successCounter = statsProvider.makeCounter(SUCCESS_STAT_NAME); + this.errorsCounter = statsProvider.makeCounter(ERRORS_STAT_NAME); + this.userErrorsCounter = statsProvider.makeCounter(USER_ERRORS_STAT_NAME); this.isWhitelisted = status -> !webhookInfo.getWhitelistedStatuses().isPresent() || webhookInfo.getWhitelistedStatuses().get().contains(status); LOG.info("Webhook enabled with info" + this.webhookInfo); } - private HttpPost createPostRequest(TaskStateChange stateChange) - throws UnsupportedEncodingException { - String eventJson = stateChange.toJson(); - HttpPost post = new HttpPost(); - post.setURI(webhookInfo.getTargetURI()); - post.setHeader("Timestamp", Long.toString(Instant.now().toEpochMilli())); - post.setEntity(new StringEntity(eventJson)); - webhookInfo.getHeaders().entrySet().forEach( - e -> post.setHeader(e.getKey(), e.getValue())); - return post; + private BoundRequestBuilder createRequest(TaskStateChange stateChange) { + return httpClient.preparePost(webhookInfo.getTargetURI().toString()) + .setBody(stateChange.toJson()) + .setSingleHeaders(webhookInfo.getHeaders()) + .addHeader("Timestamp", Long.toString(Instant.now().toEpochMilli())); } /** @@ -82,20 +96,49 @@ public class Webhook implements EventSubscriber { // resend the entire state. This check also ensures that only whitelisted statuses will be sent // to the configured endpoint. if (stateChange.getOldState().isPresent() && isWhitelisted.apply(stateChange.getNewState())) { + attemptsCounter.incrementAndGet(); try { - HttpPost post = createPostRequest(stateChange); - // Using try-with-resources on closeable and following - // https://hc.apache.org/httpcomponents-client-4.5.x/quickstart.html to make sure stream is - // closed after we get back a response to not leak http connections. - try (CloseableHttpResponse httpResponse = httpClient.execute(post)) { - HttpEntity entity = httpResponse.getEntity(); - EntityUtils.consumeQuietly(entity); - } catch (IOException exp) { - LOG.error("Error sending a Webhook event", exp); - } - } catch (UnsupportedEncodingException exp) { - LOG.error("HttpPost exception when creating an HTTP Post request", exp); + // We don't care about the response body, so only listen for the HTTP status code. + createRequest(stateChange).execute(new AsyncCompletionHandler<Integer>() { + @Override + public void onThrowable(Throwable t) { + errorsCounter.incrementAndGet(); + LOG.error("Error sending a Webhook event", t); + } + + @Override + public State onStatusReceived(HttpResponseStatus status) throws Exception { + if (status.getStatusCode() == HttpConstants.ResponseStatusCodes.OK_200) { + successCounter.incrementAndGet(); + } else { + userErrorsCounter.incrementAndGet(); + } + + // Abort after we get the status because that is all we use for processing. + return State.ABORT; + } + + @Override + public Integer onCompleted(Response response) throws Exception { + // We do not care about the full response. + return 0; + } + }); + } catch (Exception e) { + LOG.error("Error making Webhook request", e); + errorsCounter.incrementAndGet(); } } } + + @Override + protected void startUp() throws Exception { + // No-op + } + + @Override + protected void shutDown() throws Exception { + LOG.info("Shutting down async Webhook client."); + httpClient.close(); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java b/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java index da22c21..44789d6 100644 --- a/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java +++ b/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java @@ -129,6 +129,20 @@ public class WebhookInfo { return this; } + /** + * This method will add the supplied headers to the current headers. + * + * @param values The headers to add. + * @return The modified builder. + */ + public WebhookInfoBuilder setHeaders(Map<String, String> values) { + for (Map.Entry<String, String> entry : values.entrySet()) { + setHeader(entry.getKey(), entry.getValue()); + } + + return this; + } + public WebhookInfoBuilder setTargetURL(String targetURL) { this.targetURL = targetURL; return this; http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java b/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java index 1f10af7..8c9ea05 100644 --- a/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java +++ b/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java @@ -29,17 +29,18 @@ import org.apache.aurora.common.args.Arg; import org.apache.aurora.common.args.CmdLine; import org.apache.aurora.common.args.constraints.CanRead; import org.apache.aurora.common.args.constraints.Exists; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.conn.ConnectionKeepAliveStrategy; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy; -import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.channel.DefaultKeepAliveStrategy; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkArgument; +import static org.asynchttpclient.Dsl.asyncHttpClient; + /** * Binding module for webhook management. */ @@ -70,24 +71,23 @@ public class WebhookModule extends AbstractModule { protected void configure() { if (enableWebhook) { WebhookInfo webhookInfo = parseWebhookConfig(readWebhookFile()); - int timeout = webhookInfo.getConnectonTimeoutMsec(); - RequestConfig config = RequestConfig.custom() - .setConnectTimeout(timeout) // establish connection with server eg time to TCP handshake. - .setConnectionRequestTimeout(timeout) // get a connection from internal pool. - .setSocketTimeout(timeout) // wait for data after connection was established. + DefaultAsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder() + .setConnectTimeout(webhookInfo.getConnectonTimeoutMsec()) + .setHandshakeTimeout(webhookInfo.getConnectonTimeoutMsec()) + .setSslSessionTimeout(webhookInfo.getConnectonTimeoutMsec()) + .setReadTimeout(webhookInfo.getConnectonTimeoutMsec()) + .setRequestTimeout(webhookInfo.getConnectonTimeoutMsec()) + .setKeepAliveStrategy(new DefaultKeepAliveStrategy()) .build(); - ConnectionKeepAliveStrategy connectionStrategy = new DefaultConnectionKeepAliveStrategy(); - CloseableHttpClient client = - HttpClientBuilder.create() - .setDefaultRequestConfig(config) - // being explicit about using default Keep-Alive strategy. - .setKeepAliveStrategy(connectionStrategy) - .build(); + AsyncHttpClient httpClient = asyncHttpClient(config); bind(WebhookInfo.class).toInstance(webhookInfo); - bind(CloseableHttpClient.class).toInstance(client); + bind(AsyncHttpClient.class).toInstance(httpClient); PubsubEventModule.bindSubscriber(binder(), Webhook.class); bind(Webhook.class).in(Singleton.class); + + SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()) + .to(Webhook.class); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/main/resources/org/apache/aurora/scheduler/webhook.json ---------------------------------------------------------------------- diff --git a/src/main/resources/org/apache/aurora/scheduler/webhook.json b/src/main/resources/org/apache/aurora/scheduler/webhook.json index b78c063..e645f64 100644 --- a/src/main/resources/org/apache/aurora/scheduler/webhook.json +++ b/src/main/resources/org/apache/aurora/scheduler/webhook.json @@ -3,6 +3,6 @@ "Content-Type": "application/vnd.kafka.json.v1+json", "Producer-Type": "reliable" }, - "targetURL": "http://localhost:5000/", - "timeoutMsec": 50 + "targetURL": "http://localhost:8080/", + "timeoutMsec": 5000 } http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java b/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java index 07f39fa..827aa2d 100644 --- a/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java +++ b/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java @@ -17,259 +17,352 @@ import java.io.IOException; import java.net.URI; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; -import com.google.common.collect.ImmutableMap; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; -import org.apache.aurora.common.testing.easymock.EasyMockTest; +import com.google.common.collect.ImmutableMap; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.events.WebhookInfo.WebhookInfoBuilder; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.http.Header; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.util.EntityUtils; -import org.easymock.Capture; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.asynchttpclient.AsyncHandler; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.ListenableFuture; +import org.asynchttpclient.channel.DefaultKeepAliveStrategy; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -public class WebhookTest extends EasyMockTest { +public class WebhookTest { + private static final String STATIC_URL = "http://localhost:8080/"; + private static final Integer TIMEOUT = 5000; + private static final Map<String, String> HEADERS = ImmutableMap.of( + "Content-Type", "application/vnd.kafka.json.v1+json", + "Producer-Type", "reliable" + ); private static final IScheduledTask TASK = TaskTestUtil.makeTask("id", TaskTestUtil.JOB); private static final TaskStateChange CHANGE = TaskStateChange.initialized(TASK); - private static final TaskStateChange CHANGE_WITH_OLD_STATE = TaskStateChange + private static final TaskStateChange CHANGE_OLD_STATE = TaskStateChange .transition(TASK, ScheduleStatus.FAILED); - private static final String CHANGE_JSON = CHANGE_WITH_OLD_STATE.toJson(); + private static final TaskStateChange CHANGE_LOST = TaskStateChange.transition( + TaskTestUtil.addStateTransition(TASK, ScheduleStatus.LOST, 1L), ScheduleStatus.LOST); + private static final String CHANGE_JSON = CHANGE_OLD_STATE.toJson(); + private static final String CHANGE_LOST_JSON = CHANGE_LOST.toJson(); + // Below are test fixtures for WebhookInfoBuilders. Callers will need to specify the desired + // targetURL and build manually to get the desired WebhookInfo. We do this because we allocate + // an ephemeral port for our test Jetty server, meaning we cannot specify WebhookInfo statically. // Test fixture for WebhookInfo without a whitelist, thus all task statuses are implicitly // whitelisted. - private static final WebhookInfo WEBHOOK_INFO = WebhookInfo.newBuilder() - .setHeader("Content-Type", "application/vnd.kafka.json.v1+json") - .setHeader("Producer-Type", "reliable") - .setTargetURL("http://localhost:5000/") - .setTimeout(50) - .build(); + private static final WebhookInfoBuilder WEBHOOK_INFO_BUILDER = WebhookInfo + .newBuilder() + .setHeaders(HEADERS) + .setTimeout(TIMEOUT); // Test fixture for WebhookInfo in which only "LOST" and "FAILED" task statuses are explicitly // whitelisted. - private static final WebhookInfo WEBHOOK_INFO_WITH_WHITELIST = WebhookInfo.newBuilder() - .setHeader("Content-Type", "application/vnd.kafka.json.v1+json") - .setHeader("Producer-Type", "reliable") - .setTargetURL("http://localhost:5000/") - .setTimeout(50) + private static final WebhookInfoBuilder WEBHOOK_INFO_WITH_WHITELIST_BUILDER = WebhookInfo + .newBuilder() + .setHeaders(HEADERS) + .setTimeout(TIMEOUT) .addWhitelistedStatus("LOST") - .addWhitelistedStatus("FAILED") - .build(); + .addWhitelistedStatus("FAILED"); // Test fixture for WebhookInfo in which all task statuses are whitelisted by wildcard character. - private static final WebhookInfo WEBHOOK_INFO_WITH_WILDCARD_WHITELIST = WebhookInfo.newBuilder() - .setHeader("Content-Type", "application/vnd.kafka.json.v1+json") - .setHeader("Producer-Type", "reliable") - .setTargetURL("http://localhost:5000/") - .setTimeout(50) - .addWhitelistedStatus("*") - .build(); + private static final WebhookInfoBuilder WEBHOOK_INFO_WITH_WILDCARD_WHITELIST_BUILDER = + WebhookInfo + .newBuilder() + .setHeaders(HEADERS) + .setTimeout(TIMEOUT) + .addWhitelistedStatus("*"); + + private Server jettyServer; + private AsyncHttpClient httpClient; + private FakeStatsProvider statsProvider; + + /** + * Wrap the DefaultAsyncHttpClient for testing so we can complete futures synchronously before + * validating assertions. Otherwise, we would have to call `Thread.sleep` in our tests after + * each TaskStateChange. + */ + static class WebhookAsyncHttpClientWrapper extends DefaultAsyncHttpClient { - private CloseableHttpClient httpClient; - private Webhook webhook; + WebhookAsyncHttpClientWrapper(DefaultAsyncHttpClientConfig config) { + super(config); + } + + @Override + public <T> ListenableFuture<T> executeRequest(org.asynchttpclient.Request request, + AsyncHandler<T> handler) { + ListenableFuture<T> future = super.executeRequest(request, handler); + try { + future.get(); + future.done(); + } catch (InterruptedException | ExecutionException e) { + // Ignore, future should not fail to resolve. + } + return future; + } + } @Before - public void setUp() { - httpClient = createMock(CloseableHttpClient.class); - webhook = new Webhook(httpClient, WEBHOOK_INFO); + public void setUp() throws Exception { + DefaultAsyncHttpClientConfig testConfig = new DefaultAsyncHttpClientConfig.Builder() + .setConnectTimeout(TIMEOUT) + .setHandshakeTimeout(TIMEOUT) + .setSslSessionTimeout(TIMEOUT) + .setReadTimeout(TIMEOUT) + .setRequestTimeout(TIMEOUT) + .setKeepAliveStrategy(new DefaultKeepAliveStrategy()) + .build(); + httpClient = new WebhookAsyncHttpClientWrapper(testConfig); + statsProvider = new FakeStatsProvider(); + jettyServer = new Server(0); // Start Jetty server with ephemeral port + } + + @After + public void tearDown() throws Exception { + jettyServer.stop(); } @Test public void testTaskChangedStateNoOldState() throws Exception { + WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_BUILDER); + Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider); + // Should be a noop as oldState is MIA so this test would have throw an exception. // If it does not, then we are good. - control.replay(); - webhook.taskChangedState(CHANGE); } @Test - public void testTaskChangedWithOldState() throws Exception { - CloseableHttpResponse httpResponse = createMock(CloseableHttpResponse.class); - HttpEntity entity = createMock(HttpEntity.class); - - Capture<HttpPost> httpPostCapture = createCapture(); - expect(entity.isStreaming()).andReturn(false); - expect(httpResponse.getEntity()).andReturn(entity); - httpResponse.close(); - expectLastCall().once(); - expect(httpClient.execute(capture(httpPostCapture))).andReturn(httpResponse); - - control.replay(); - - webhook.taskChangedState(CHANGE_WITH_OLD_STATE); - - assertTrue(httpPostCapture.hasCaptured()); - assertEquals(httpPostCapture.getValue().getURI(), new URI("http://localhost:5000/")); - assertEquals(EntityUtils.toString(httpPostCapture.getValue().getEntity()), CHANGE_JSON); - Header[] producerTypeHeader = httpPostCapture.getValue().getHeaders("Producer-Type"); - assertEquals(producerTypeHeader.length, 1); - assertEquals(producerTypeHeader[0].getName(), "Producer-Type"); - assertEquals(producerTypeHeader[0].getValue(), "reliable"); - Header[] contentTypeHeader = httpPostCapture.getValue().getHeaders("Content-Type"); - assertEquals(contentTypeHeader.length, 1); - assertEquals(contentTypeHeader[0].getName(), "Content-Type"); - assertEquals(contentTypeHeader[0].getValue(), "application/vnd.kafka.json.v1+json"); - assertNotNull(httpPostCapture.getValue().getHeaders("Timestamp")); + public void testTaskChangedWithOldStateSuccess() throws Exception { + jettyServer.setHandler(createHandlerThatExpectsContent(CHANGE_JSON)); + jettyServer.start(); + WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_BUILDER); + Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider); + + webhook.taskChangedState(CHANGE_OLD_STATE); + + assertEquals(1, statsProvider.getLongValue(Webhook.ATTEMPTS_STAT_NAME)); + assertEquals(1, statsProvider.getLongValue(Webhook.SUCCESS_STAT_NAME)); + assertEquals(0, statsProvider.getLongValue(Webhook.ERRORS_STAT_NAME)); + assertEquals(0, statsProvider.getLongValue(Webhook.USER_ERRORS_STAT_NAME)); } @Test - public void testTaskChangeInWhiteList() throws Exception { - CloseableHttpResponse httpResponse = createMock(CloseableHttpResponse.class); - HttpEntity entity = createMock(HttpEntity.class); + public void testTaskChangedWithOldStateUserError() throws Exception { + // We expect CHANGE_JSON but get CHANGE_LOST which causes an error code to be returned. + jettyServer.setHandler(createHandlerThatExpectsContent(CHANGE_JSON)); + jettyServer.start(); + WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_BUILDER); + Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider); + + webhook.taskChangedState(CHANGE_LOST); - Capture<HttpPost> httpPostCapture = createCapture(); - expect(entity.isStreaming()).andReturn(false); - expect(httpResponse.getEntity()).andReturn(entity); - httpResponse.close(); - expectLastCall().once(); - expect(httpClient.execute(capture(httpPostCapture))).andReturn(httpResponse); + assertEquals(1, statsProvider.getLongValue(Webhook.ATTEMPTS_STAT_NAME)); + assertEquals(0, statsProvider.getLongValue(Webhook.SUCCESS_STAT_NAME)); + assertEquals(0, statsProvider.getLongValue(Webhook.ERRORS_STAT_NAME)); + assertEquals(1, statsProvider.getLongValue(Webhook.USER_ERRORS_STAT_NAME)); + } - control.replay(); + @Test + public void testTaskChangedWithOldStateError() throws Exception { + // We have a special handler here to cause a TimeoutException to trigger `onThrowable` + jettyServer.setHandler(new AbstractHandler() { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, + HttpServletResponse response) throws IOException, ServletException { + try { + Thread.sleep(TIMEOUT + 100); + } catch (InterruptedException e) { + // Should never get here. + } + } + }); + jettyServer.start(); + WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_BUILDER); + Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider); - // Verifying TaskStateChange in the whitelist is sent to the configured endpoint. - Webhook webhookWithWhitelist = new Webhook(httpClient, WEBHOOK_INFO_WITH_WHITELIST); - TaskStateChange taskStateChangeInWhitelist = TaskStateChange - .transition(TaskTestUtil.addStateTransition(TASK, ScheduleStatus.LOST, 1L), - ScheduleStatus.RUNNING); - webhookWithWhitelist.taskChangedState(taskStateChangeInWhitelist); - - assertTrue(httpPostCapture.hasCaptured()); - assertEquals(httpPostCapture.getValue().getURI(), new URI("http://localhost:5000/")); - assertEquals(EntityUtils.toString(httpPostCapture.getValue().getEntity()), - taskStateChangeInWhitelist.toJson()); - Header[] producerTypeHeader = httpPostCapture.getValue().getHeaders("Producer-Type"); - assertEquals(producerTypeHeader.length, 1); - assertEquals(producerTypeHeader[0].getName(), "Producer-Type"); - assertEquals(producerTypeHeader[0].getValue(), "reliable"); - Header[] contentTypeHeader = httpPostCapture.getValue().getHeaders("Content-Type"); - assertEquals(contentTypeHeader.length, 1); - assertEquals(contentTypeHeader[0].getName(), "Content-Type"); - assertEquals(contentTypeHeader[0].getValue(), "application/vnd.kafka.json.v1+json"); - assertNotNull(httpPostCapture.getValue().getHeaders("Timestamp")); + webhook.taskChangedState(CHANGE_OLD_STATE); + + assertEquals(1, statsProvider.getLongValue(Webhook.ATTEMPTS_STAT_NAME)); + assertEquals(0, statsProvider.getLongValue(Webhook.SUCCESS_STAT_NAME)); + assertEquals(1, statsProvider.getLongValue(Webhook.ERRORS_STAT_NAME)); + assertEquals(0, statsProvider.getLongValue(Webhook.USER_ERRORS_STAT_NAME)); } @Test - public void testTaskChangeNotInWhiteList() throws Exception { - control.replay(); + public void testTaskChangeInWhiteList() throws Exception { + // Verifying TaskStateChange in the whitelist is sent to the configured endpoint. + jettyServer.setHandler(createHandlerThatExpectsContent(CHANGE_LOST_JSON)); + jettyServer.start(); + WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_WITH_WHITELIST_BUILDER); + Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider); - // Verifying TaskStateChange not in the whitelist is not sent to the configured endpoint. - Webhook webhookWithWhitelist = new Webhook(httpClient, WEBHOOK_INFO_WITH_WHITELIST); - webhookWithWhitelist.taskChangedState(CHANGE_WITH_OLD_STATE); + webhook.taskChangedState(CHANGE_LOST); + + assertEquals(1, statsProvider.getLongValue(Webhook.ATTEMPTS_STAT_NAME)); + assertEquals(1, statsProvider.getLongValue(Webhook.SUCCESS_STAT_NAME)); + assertEquals(0, statsProvider.getLongValue(Webhook.ERRORS_STAT_NAME)); + assertEquals(0, statsProvider.getLongValue(Webhook.USER_ERRORS_STAT_NAME)); } @Test - public void testCatchHttpClientException() throws Exception { - // IOException should be silenced. - Capture<HttpPost> httpPostCapture = createCapture(); - expect(httpClient.execute(capture(httpPostCapture))) - .andThrow(new IOException()); - control.replay(); - - webhook.taskChangedState(CHANGE_WITH_OLD_STATE); + public void testTaskChangeNotInWhiteList() throws Exception { + // Verifying TaskStateChange not in the whitelist is not sent to the configured endpoint. + jettyServer.setHandler(createHandlerThatExpectsContent(CHANGE_JSON)); + jettyServer.start(); + WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_WITH_WHITELIST_BUILDER); + Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider); + + webhook.taskChangedState(CHANGE_OLD_STATE); } @Test public void testParsingWebhookInfo() throws Exception { + WebhookInfo webhookInfo = WEBHOOK_INFO_BUILDER + .setTargetURL(STATIC_URL) + .build(); + WebhookInfo parsedWebhookInfo = WebhookModule.parseWebhookConfig( WebhookModule.readWebhookFile()); // Verifying the WebhookInfo parsed from webhook.json file is identical to the WebhookInfo // built from WebhookInfoBuilder. - assertEquals(parsedWebhookInfo.toString(), WEBHOOK_INFO.toString()); + assertEquals(parsedWebhookInfo.toString(), webhookInfo.toString()); // Verifying all attributes were parsed correctly. - assertEquals(parsedWebhookInfo.getHeaders(), WEBHOOK_INFO.getHeaders()); - assertEquals(parsedWebhookInfo.getTargetURI(), WEBHOOK_INFO.getTargetURI()); + assertEquals(parsedWebhookInfo.getHeaders(), webhookInfo.getHeaders()); + assertEquals(parsedWebhookInfo.getTargetURI(), webhookInfo.getTargetURI()); assertEquals(parsedWebhookInfo.getConnectonTimeoutMsec(), - WEBHOOK_INFO.getConnectonTimeoutMsec()); - assertEquals(parsedWebhookInfo.getWhitelistedStatuses(), WEBHOOK_INFO.getWhitelistedStatuses()); - control.replay(); + webhookInfo.getConnectonTimeoutMsec()); + assertEquals(parsedWebhookInfo.getWhitelistedStatuses(), webhookInfo.getWhitelistedStatuses()); } @Test public void testWebhookInfo() throws Exception { - assertEquals(WEBHOOK_INFO.toString(), + WebhookInfo webhookInfo = WEBHOOK_INFO_BUILDER + .setTargetURL(STATIC_URL) + .build(); + + assertEquals(webhookInfo.toString(), "WebhookInfo{headers={" + "Content-Type=application/vnd.kafka.json.v1+json, " + "Producer-Type=reliable" + "}, " - + "targetURI=http://localhost:5000/, " - + "connectTimeoutMsec=50, " + + "targetURI=http://localhost:8080/, " + + "connectTimeoutMsec=5000, " + "whitelistedStatuses=null" + "}"); // Verifying all attributes were set correctly. - Map<String, String> headers = ImmutableMap.of( - "Content-Type", "application/vnd.kafka.json.v1+json", - "Producer-Type", "reliable"); - assertEquals(WEBHOOK_INFO.getHeaders(), headers); - URI targetURI = new URI("http://localhost:5000/"); - assertEquals(WEBHOOK_INFO.getTargetURI(), targetURI); - Integer timeoutMsec = 50; - assertEquals(WEBHOOK_INFO.getConnectonTimeoutMsec(), timeoutMsec); - assertFalse(WEBHOOK_INFO.getWhitelistedStatuses().isPresent()); - control.replay(); + assertEquals(webhookInfo.getHeaders(), HEADERS); + assertEquals(webhookInfo.getTargetURI(), URI.create(STATIC_URL)); + assertEquals(webhookInfo.getConnectonTimeoutMsec(), TIMEOUT); + assertFalse(webhookInfo.getWhitelistedStatuses().isPresent()); } @Test public void testWebhookInfoWithWhiteList() throws Exception { - assertEquals(WEBHOOK_INFO_WITH_WHITELIST.toString(), + WebhookInfo webhookInfoWithWhitelist = WEBHOOK_INFO_WITH_WHITELIST_BUILDER + .setTargetURL(STATIC_URL) + .build(); + + assertEquals(webhookInfoWithWhitelist.toString(), "WebhookInfo{headers={" + "Content-Type=application/vnd.kafka.json.v1+json, " + "Producer-Type=reliable" + "}, " - + "targetURI=http://localhost:5000/, " - + "connectTimeoutMsec=50, " + + "targetURI=http://localhost:8080/, " + + "connectTimeoutMsec=5000, " + "whitelistedStatuses=[LOST, FAILED]" + "}"); // Verifying all attributes were set correctly. - Map<String, String> headers = ImmutableMap.of( - "Content-Type", "application/vnd.kafka.json.v1+json", - "Producer-Type", "reliable"); - assertEquals(WEBHOOK_INFO_WITH_WHITELIST.getHeaders(), headers); - URI targetURI = new URI("http://localhost:5000/"); - assertEquals(WEBHOOK_INFO_WITH_WHITELIST.getTargetURI(), targetURI); - Integer timeoutMsec = 50; - assertEquals(WEBHOOK_INFO_WITH_WHITELIST.getConnectonTimeoutMsec(), timeoutMsec); - List<ScheduleStatus> statuses = WEBHOOK_INFO_WITH_WHITELIST.getWhitelistedStatuses().get(); + assertEquals(webhookInfoWithWhitelist.getHeaders(), HEADERS); + assertEquals(webhookInfoWithWhitelist.getTargetURI(), URI.create(STATIC_URL)); + assertEquals(webhookInfoWithWhitelist.getConnectonTimeoutMsec(), TIMEOUT); + List<ScheduleStatus> statuses = webhookInfoWithWhitelist.getWhitelistedStatuses().get(); assertEquals(statuses.size(), 2); assertEquals(statuses.get(0), ScheduleStatus.LOST); assertEquals(statuses.get(1), ScheduleStatus.FAILED); - control.replay(); } @Test public void testWebhookInfoWithWildcardWhitelist() throws Exception { - assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.toString(), + WebhookInfo webhookInfoWithWildcardWhitelist = WEBHOOK_INFO_WITH_WILDCARD_WHITELIST_BUILDER + .setTargetURL(STATIC_URL) + .build(); + + assertEquals(webhookInfoWithWildcardWhitelist.toString(), "WebhookInfo{headers={" + "Content-Type=application/vnd.kafka.json.v1+json, " + "Producer-Type=reliable" + "}, " - + "targetURI=http://localhost:5000/, " - + "connectTimeoutMsec=50, " + + "targetURI=http://localhost:8080/, " + + "connectTimeoutMsec=5000, " + "whitelistedStatuses=null" + "}"); // Verifying all attributes were set correctly. - Map<String, String> headers = ImmutableMap.of( - "Content-Type", "application/vnd.kafka.json.v1+json", - "Producer-Type", "reliable"); - assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getHeaders(), headers); - URI targetURI = new URI("http://localhost:5000/"); - assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getTargetURI(), targetURI); - Integer timeoutMsec = 50; - assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getConnectonTimeoutMsec(), timeoutMsec); - assertFalse(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getWhitelistedStatuses().isPresent()); - control.replay(); + assertEquals(webhookInfoWithWildcardWhitelist.getHeaders(), HEADERS); + assertEquals(webhookInfoWithWildcardWhitelist.getTargetURI(), URI.create(STATIC_URL)); + assertEquals(webhookInfoWithWildcardWhitelist.getConnectonTimeoutMsec(), TIMEOUT); + assertFalse(webhookInfoWithWildcardWhitelist.getWhitelistedStatuses().isPresent()); + } + + /** Create a Jetty handler that expects a request with a given content body. */ + private AbstractHandler createHandlerThatExpectsContent(String expected) { + return new AbstractHandler() { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, + HttpServletResponse response) throws IOException, ServletException { + String body = request.getReader().lines().collect(Collectors.joining()); + if (validateRequest(request) && body.equals(expected)) { + response.setStatus(HttpServletResponse.SC_OK); + } else { + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + baseRequest.setHandled(true); + } + }; + } + + /** Validate that the request is what we are expecting to send out (ex. POST, headers). */ + private boolean validateRequest(HttpServletRequest request) { + // Validate general fields are what we expect (POST, headers). + if (!request.getMethod().equals("POST")) { + return false; + } + + for (Map.Entry<String, String> header : HEADERS.entrySet()) { + String expectedKey = header.getKey(); + String expectedValue = header.getValue(); + + if (!expectedValue.equals(request.getHeader(expectedKey))) { + return false; + } + } + + return true; + } + + /** + * Need this method to build `WebhookInfo` for testing with the running Jetty port. `jettyServer` + * should have been started before this method is called. + */ + private WebhookInfo buildWebhookInfoWithJettyPort(WebhookInfoBuilder builder) { + String fullUrl = String.format("http://localhost:%d", jettyServer.getURI().getPort()); + return builder + .setTargetURL(fullUrl) + .build(); } }
