Repository: aurora
Updated Branches:
  refs/heads/master 4c4040fd2 -> 4745c8cc2


Refactor of Webhook and no longer posting entire task state via webhook on 
scheduler restart

This is a refactor with addition of HttpClient injected into Webhook class as 
opposed to previous usage of lower level HtttpURLConnection objects. 
Additionally due to peformance issues, it is unncessary to POST entire task 
state to webhook endpoint on every scheduler restart so that is removed in this 
patch.

Bugs closed: AURORA-1772

Reviewed at https://reviews.apache.org/r/51980/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4745c8cc
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4745c8cc
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4745c8cc

Branch: refs/heads/master
Commit: 4745c8cc205e3fb1bfba25c8bc8bce838964186c
Parents: 4c4040f
Author: Dmitriy Shirchenko <cald...@gmail.com>
Authored: Tue Sep 20 18:57:24 2016 +0200
Committer: Stephan Erb <s...@apache.org>
Committed: Tue Sep 20 18:57:24 2016 +0200

----------------------------------------------------------------------
 build.gradle                                    |  6 ++
 .../apache/aurora/scheduler/events/Webhook.java | 92 +++++++------------
 .../aurora/scheduler/events/WebhookInfo.java    | 29 +++---
 .../aurora/scheduler/events/WebhookModule.java  | 23 ++++-
 .../org/apache/aurora/scheduler/webhook.json    |  2 +-
 .../aurora/scheduler/events/WebhookTest.java    | 96 +++++++++++++++-----
 6 files changed, 149 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/4745c8cc/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index d5a3a7a..07689f9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -84,6 +84,8 @@ For more details, please see 
https://issues.apache.org/jira/browse/AURORA-1169
   ext.gsonRev = '2.3.1'
   ext.guavaRev = '19.0'
   ext.guiceRev = '3.0'
+  ext.httpclientRev = '4.5.2'
+  ext.httpcoreRev = '4.4.4'
   ext.jacksonRev = '2.5.1'
   ext.jerseyRev = '1.19'
   ext.jsrRev = '3.0.1'
@@ -109,6 +111,8 @@ For more details, please see 
https://issues.apache.org/jira/browse/AURORA-1169
       // See 
http://forums.gradle.org/gradle/topics/shouldnt-resolutionstrategy-affect-depending-projects-transitive-dependencies
       resolutionStrategy {
         failOnVersionConflict()
+        force "org.apache.httpcomponents:httpclient:${httpclientRev}"
+        force "org.apache.httpcomponents:httpcore:${httpcoreRev}"
         force "com.fasterxml.jackson.core:jackson-annotations:${jacksonRev}"
         force "com.fasterxml.jackson.core:jackson-core:${jacksonRev}"
         force "com.google.code.gson:gson:${gsonRev}"
@@ -366,6 +370,8 @@ dependencies {
   compile "org.apache.curator:curator-framework:${curatorRev}"
   compile "org.apache.curator:curator-recipes:${curatorRev}"
   compile 'org.apache.mesos:mesos:1.0.0'
+  compile "org.apache.httpcomponents:httpclient:${httpclientRev}"
+  compile "org.apache.httpcomponents:httpcore:${httpcoreRev}"
   compile "org.apache.shiro:shiro-guice:${shiroRev}"
   compile "org.apache.shiro:shiro-web:${shiroRev}"
   compile "org.apache.zookeeper:zookeeper:${zookeeperRev}"

http://git-wip-us.apache.org/repos/asf/aurora/blob/4745c8cc/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/Webhook.java 
b/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
index e54aa19..66c1344 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
@@ -13,10 +13,8 @@
  */
 package org.apache.aurora.scheduler.events;
 
-import java.io.DataOutputStream;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.time.Instant;
 
 import com.google.common.eventbus.Subscribe;
@@ -25,6 +23,9 @@ import com.google.inject.Inject;
 
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,64 +36,26 @@ public class Webhook implements EventSubscriber {
 
   private static final Logger LOG = LoggerFactory.getLogger(Webhook.class);
 
-  private static final String CALL_METHOD = "POST";
   private final WebhookInfo webhookInfo;
+  private final HttpClient httpClient;
 
   @Inject
-  Webhook(WebhookInfo webhookInfo) {
+  Webhook(HttpClient httpClient, WebhookInfo webhookInfo) {
     this.webhookInfo = webhookInfo;
-    LOG.debug("Webhook enabled with info" + this.webhookInfo);
+    this.httpClient = httpClient;
+    LOG.info("Webhook enabled with info" + this.webhookInfo);
   }
 
-  private HttpURLConnection initializeConnection() {
-    try {
-      final HttpURLConnection connection = (HttpURLConnection) new URL(
-          this.webhookInfo.getTargetURL()).openConnection();
-      connection.setRequestMethod(CALL_METHOD);
-      connection.setConnectTimeout(this.webhookInfo.getConnectonTimeout());
-
-      webhookInfo.getHeaders().entrySet().forEach(
-          e -> connection.setRequestProperty(e.getKey(), e.getValue()));
-      connection.setRequestProperty("TimeStamp", 
Long.toString(Instant.now().toEpochMilli()));
-      connection.setDoOutput(true);
-      return connection;
-    } catch (Exception e) {
-      // Do nothing since we are just doing best-effort here.
-      LOG.error("Exception trying to initialize a connection:", e);
-      return null;
-    }
-  }
-
-  /**
-   * Calls a specified endpoint with the provided string representing an 
internal event.
-   *
-   * @param eventJson String represenation of task state change.
-   */
-  public void callEndpoint(String eventJson) {
-    HttpURLConnection connection = this.initializeConnection();
-    if (connection == null) {
-      LOG.error("Received a null object when trying to initialize an HTTP 
connection");
-    } else {
-      try {
-        try (DataOutputStream wr = new 
DataOutputStream(connection.getOutputStream())) {
-          wr.writeBytes(eventJson);
-          LOG.debug("Sending message " + eventJson
-              + " with connection info " + connection.toString()
-              + " with WebhookInfo " + this.webhookInfo.toString());
-        } catch (Exception e) {
-          InputStream errorStream = connection.getErrorStream();
-          if (errorStream != null) {
-            errorStream.close();
-          }
-          LOG.error("Exception writing via HTTP connection", e);
-        }
-        // Don't care about reading input so just performing basic close() 
operation.
-        connection.getInputStream().close();
-      } catch (Exception e) {
-        LOG.error("Exception when sending a task change event", e);
-      }
-    }
-    LOG.debug("Done with Webhook call");
+  private HttpPost createPostRequest(TaskStateChange stateChange)
+      throws UnsupportedEncodingException {
+    String eventJson = stateChange.toJson();
+    HttpPost post = new HttpPost();
+    post.setURI(webhookInfo.getTargetURI());
+    post.setHeader("Timestamp", Long.toString(Instant.now().toEpochMilli()));
+    post.setEntity(new StringEntity(eventJson));
+    webhookInfo.getHeaders().entrySet().forEach(
+        e -> post.setHeader(e.getKey(), e.getValue()));
+    return post;
   }
 
   /**
@@ -104,7 +67,20 @@ public class Webhook implements EventSubscriber {
    */
   @Subscribe
   public void taskChangedState(TaskStateChange stateChange) {
-    String eventJson = stateChange.toJson();
-    callEndpoint(eventJson);
+    LOG.debug("Got an event: " + stateChange.toString());
+    // Old state is not present because a scheduler just failed over. In that 
case we do not want to
+    // resend the entire state.
+    if (stateChange.getOldState().isPresent()) {
+      try {
+        HttpPost post = createPostRequest(stateChange);
+        try {
+          httpClient.execute(post);
+        }  catch (IOException exp) {
+          LOG.error("Error sending a Webhook event", exp);
+        }
+      } catch (UnsupportedEncodingException exp) {
+        LOG.error("HttpPost exception when creating an HTTP Post request", 
exp);
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4745c8cc/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java 
b/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
index e4193f7..37c0d79 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
@@ -13,6 +13,8 @@
  */
 package org.apache.aurora.scheduler.events;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Map;
 
 import com.google.common.base.MoreObjects;
@@ -28,9 +30,9 @@ import static java.util.Objects.requireNonNull;
  * Defines configuration for Webhook.
  */
 public class WebhookInfo {
-  private final Integer connectTimeout;
+  private final Integer connectTimeoutMsec;
   private final Map<String, String> headers;
-  private final String targetURL;
+  private final URI targetURI;
 
   /**
    * Return key:value pairs of headers to set for every connection.
@@ -42,12 +44,12 @@ public class WebhookInfo {
   }
 
   /**
-   * Returns URL to post events to.
+   * Returns URI where to post events.
    *
-   * @return String
+   * @return URI
    */
-  public String getTargetURL() {
-    return this.targetURL;
+  URI getTargetURI() {
+    return targetURI;
   }
 
   /**
@@ -55,28 +57,27 @@ public class WebhookInfo {
    *
    * @return Integer value.
    */
-  public Integer getConnectonTimeout() {
-    return this.connectTimeout;
+  Integer getConnectonTimeoutMsec() {
+    return connectTimeoutMsec;
   }
 
   @JsonCreator
   public WebhookInfo(
        @JsonProperty("headers") Map<String, String> headers,
        @JsonProperty("targetURL") String targetURL,
-       @JsonProperty("timeoutMsec") Integer timeout) {
+       @JsonProperty("timeoutMsec") Integer timeout) throws URISyntaxException 
{
 
-    requireNonNull(targetURL);
     this.headers = ImmutableMap.copyOf(headers);
-    this.targetURL = requireNonNull(targetURL);
-    this.connectTimeout = requireNonNull(timeout);
+    this.targetURI = new URI(requireNonNull(targetURL));
+    this.connectTimeoutMsec = requireNonNull(timeout);
   }
 
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(this)
       .add("headers", headers.toString())
-      .add("targetURL", targetURL)
-      .add("connectTimeout", connectTimeout)
+      .add("targetURI", targetURI.toString())
+      .add("connectTimeoutMsec", connectTimeoutMsec)
       .toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4745c8cc/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java 
b/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java
index eaa7053..71aae98 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java
@@ -30,6 +30,11 @@ import org.apache.aurora.common.args.Arg;
 import org.apache.aurora.common.args.CmdLine;
 import org.apache.aurora.common.args.constraints.CanRead;
 import org.apache.aurora.common.args.constraints.Exists;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.conn.ConnectionKeepAliveStrategy;
+import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,7 +70,23 @@ public class WebhookModule extends AbstractModule {
   @Override
   protected void configure() {
     if (enableWebhook) {
-      
bind(WebhookInfo.class).toInstance(parseWebhookConfig(readWebhookFile()));
+      WebhookInfo webhookInfo = parseWebhookConfig(readWebhookFile());
+      int timeout = webhookInfo.getConnectonTimeoutMsec();
+      RequestConfig config = RequestConfig.custom()
+          .setConnectTimeout(timeout) // establish connection with server eg 
time to TCP handshake.
+          .setConnectionRequestTimeout(timeout)  // get a connection from 
internal pool.
+          .setSocketTimeout(timeout) // wait for data after connection was 
established.
+          .build();
+      ConnectionKeepAliveStrategy connectionStrategy = new 
DefaultConnectionKeepAliveStrategy();
+      HttpClient client =
+          HttpClientBuilder.create()
+              .setDefaultRequestConfig(config)
+              // being explicit about using default Keep-Alive strategy.
+              .setKeepAliveStrategy(connectionStrategy)
+              .build();
+
+      bind(WebhookInfo.class).toInstance(webhookInfo);
+      bind(HttpClient.class).toInstance(client);
       PubsubEventModule.bindSubscriber(binder(), Webhook.class);
       bind(Webhook.class).in(Singleton.class);
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4745c8cc/src/main/resources/org/apache/aurora/scheduler/webhook.json
----------------------------------------------------------------------
diff --git a/src/main/resources/org/apache/aurora/scheduler/webhook.json 
b/src/main/resources/org/apache/aurora/scheduler/webhook.json
index 0078798..b78c063 100644
--- a/src/main/resources/org/apache/aurora/scheduler/webhook.json
+++ b/src/main/resources/org/apache/aurora/scheduler/webhook.json
@@ -4,5 +4,5 @@
     "Producer-Type": "reliable"
   },
   "targetURL": "http://localhost:5000/";,
-  "timeoutMsec": 5
+  "timeoutMsec": 50
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4745c8cc/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java 
b/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
index 488eefd..6f37baa 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
@@ -13,68 +13,114 @@
  */
 package org.apache.aurora.scheduler.events;
 
-import com.google.common.eventbus.EventBus;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
 
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 
+import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.http.Header;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.util.EntityUtils;
+import org.easymock.Capture;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class WebhookTest extends EasyMockTest {
+
   private static final IScheduledTask TASK = TaskTestUtil.makeTask("id", 
TaskTestUtil.JOB);
-  private Webhook realWebhook;
+  private final TaskStateChange change = TaskStateChange.initialized(TASK);
+  private final TaskStateChange changeWithOldState = TaskStateChange
+      .transition(TASK, ScheduleStatus.FAILED);
+  private final String changeJson = changeWithOldState.toJson();
+
+  private HttpClient httpClient;
   private Webhook webhook;
-  private EventBus eventBus;
 
   @Before
   public void setUp() {
-    webhook = createMock(Webhook.class);
-    eventBus = new EventBus();
-    eventBus.register(webhook);
-    WebhookInfo webhookInfo = WebhookModule.parseWebhookConfig(
-        "{\"headers\": {\"Producer-Type\": \"reliable\","
-            + " \"Content-Type\": \"application/vnd.kafka.json.v1+json\"},"
-            + " \"timeoutMsec\": 1,"
-            + " \"targetURL\": \"http://localhost:5000/\"}";
-    );
-    realWebhook = new Webhook(webhookInfo);
+    WebhookInfo webhookInfo = 
WebhookModule.parseWebhookConfig(WebhookModule.readWebhookFile());
+    httpClient = createMock(HttpClient.class);
+    webhook = new Webhook(httpClient, webhookInfo);
   }
 
-  private final TaskStateChange change = TaskStateChange.initialized(TASK);
-  private final String changeJson = TaskStateChange.initialized(TASK).toJson();
-
   @Test
-  public void testTaskChangedState() {
+  public void testTaskChangedStateNoOldState() throws Exception {
+    // Should be a noop as oldState is MIA so this test would have throw an 
exception.
+    // If it does not, then we are good.
+    control.replay();
     webhook.taskChangedState(change);
+  }
+
+  @Test
+  public void testTaskChangedWithOldState() throws Exception {
+    Capture<HttpPost> httpPostCapture = createCapture();
+    expect(httpClient.execute(capture(httpPostCapture))).andReturn(null);
 
     control.replay();
 
-    eventBus.post(change);
+    webhook.taskChangedState(changeWithOldState);
+
+    assertTrue(httpPostCapture.hasCaptured());
+    assertEquals(httpPostCapture.getValue().getURI(), new 
URI("http://localhost:5000/";));
+    assertEquals(EntityUtils.toString(httpPostCapture.getValue().getEntity()), 
changeJson);
+    Header[] producerTypeHeader = 
httpPostCapture.getValue().getHeaders("Producer-Type");
+    assertEquals(producerTypeHeader.length, 1);
+    assertEquals(producerTypeHeader[0].getName(), "Producer-Type");
+    assertEquals(producerTypeHeader[0].getValue(), "reliable");
+    Header[] contentTypeHeader = 
httpPostCapture.getValue().getHeaders("Content-Type");
+    assertEquals(contentTypeHeader.length, 1);
+    assertEquals(contentTypeHeader[0].getName(), "Content-Type");
+    assertEquals(contentTypeHeader[0].getValue(), 
"application/vnd.kafka.json.v1+json");
+    assertNotNull(httpPostCapture.getValue().getHeaders("Timestamp"));
   }
 
   @Test
-  public void testCallEndpoint() {
+  public void testCatchHttpClientException() throws Exception {
+    // IOException should be silenced.
+    Capture<HttpPost> httpPostCapture = createCapture();
+    expect(httpClient.execute(capture(httpPostCapture)))
+        .andThrow(new IOException());
     control.replay();
 
-    realWebhook.callEndpoint(changeJson);
+    webhook.taskChangedState(changeWithOldState);
   }
 
   @Test
-  public void testWebhookInfo() {
-    WebhookInfo webhookInfo = 
WebhookModule.parseWebhookConfig(WebhookModule.readWebhookFile());
-    assertEquals(webhookInfo.toString(),
+  public void testWebhookInfo() throws Exception {
+    WebhookInfo parsedWebhookInfo = WebhookModule.parseWebhookConfig(
+        WebhookModule.readWebhookFile());
+    assertEquals(parsedWebhookInfo.toString(),
         "WebhookInfo{headers={"
             + "Content-Type=application/vnd.kafka.json.v1+json, "
             + "Producer-Type=reliable"
             + "}, "
-            + "targetURL=http://localhost:5000/, "
-            + "connectTimeout=5"
+            + "targetURI=http://localhost:5000/, "
+            + "connectTimeoutMsec=50"
             + "}");
+    // Verifying all attributes were parsed correctly.
+    Map<String, String> headers = ImmutableMap.of(
+        "Content-Type", "application/vnd.kafka.json.v1+json",
+        "Producer-Type", "reliable");
+    assertEquals(parsedWebhookInfo.getHeaders(), headers);
+    URI targetURI = new URI("http://localhost:5000/";);
+    assertEquals(parsedWebhookInfo.getTargetURI(), targetURI);
+    Integer timeoutMsec = 50;
+    assertEquals(parsedWebhookInfo.getConnectonTimeoutMsec(), timeoutMsec);
+
     control.replay();
   }
 }

Reply via email to