Repository: flume
Updated Branches:
  refs/heads/trunk 5e52ac4ad -> a84491472


Add an HTTP sink

This patch adds an HTTP sink. Supports configurable backoff & retry for 
different HTTP status codes, and instrumentation.

This closes #84

Reviewers: Shang Wu, Jeff Holoman, Bessenyei Balázs Donát

(Ben Wheeler via Bessenyei Balázs Donát)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a8449147
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a8449147
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a8449147

Branch: refs/heads/trunk
Commit: a844914721f68282b6fce2b3b0006603bafd790d
Parents: 5e52ac4
Author: Ben Wheeler <[email protected]>
Authored: Tue Jan 24 08:08:42 2017 +0000
Committer: Bessenyei Balázs Donát <[email protected]>
Committed: Tue Jan 24 08:09:25 2017 +0000

----------------------------------------------------------------------
 .../flume/conf/sink/SinkConfiguration.java      |  10 +-
 .../org/apache/flume/conf/sink/SinkType.java    |   9 +-
 flume-ng-dist/pom.xml                           |   4 +
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  68 ++++
 flume-ng-sinks/flume-http-sink/pom.xml          | 136 +++++++
 .../org/apache/flume/sink/http/HttpSink.java    | 408 +++++++++++++++++++
 .../apache/flume/sink/http/package-info.java    |  23 ++
 .../apache/flume/sink/http/TestHttpSink.java    | 322 +++++++++++++++
 .../apache/flume/sink/http/TestHttpSinkIT.java  | 256 ++++++++++++
 .../src/test/resources/log4j.properties         |  28 ++
 flume-ng-sinks/pom.xml                          |   1 +
 pom.xml                                         |   7 +
 12 files changed, 1270 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
index 1acd291..d6f4cbf 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
@@ -153,7 +153,15 @@ public class SinkConfiguration extends 
ComponentConfiguration {
      * Hive Sink
      * @see org.apache.flume.sink.hive.HiveSink
      */
-    HIVE("org.apache.flume.sink.hive.HiveSinkConfiguration");
+    HIVE("org.apache.flume.sink.hive.HiveSinkConfiguration"),
+
+    /**
+     * HTTP Sink
+     * @see org.apache.flume.sink.http.HttpSink
+     */
+    HTTP("org.apache.flume.sink.http.HttpSinkConfiguration");
+
+
 
     private final String sinkConfigurationName;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java
index edb9f46..bfae570 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java
@@ -105,7 +105,14 @@ public enum SinkType {
    * Hive Sink
    * @see org.apache.flume.sink.hive.HiveSink
    */
-  HIVE("org.apache.flume.sink.hive.HiveSink");
+  HIVE("org.apache.flume.sink.hive.HiveSink"),
+
+  /**
+   * HTTP Sink
+   *
+   * @see org.apache.flume.sink.http.HttpSink
+   */
+  HTTP("org.apache.flume.sink.http.HttpSink");
 
   private final String sinkClassName;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index d684c47..3841467 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -154,6 +154,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.flume.flume-ng-sinks</groupId>
+      <artifactId>flume-http-sink</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume.flume-ng-sinks</groupId>
       <artifactId>flume-ng-elasticsearch-sink</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 6bc9dba..afa6625 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2916,6 +2916,74 @@ that the operating system user of the Flume processes 
has read privileges on the
     };
 
 
+HTTP Sink
+~~~~~~~~~
+
+Behaviour of this sink is that it will take events from the channel, and
+send those events to a remote service using an HTTP POST request. The event
+content is sent as the POST body.
+
+Error handling behaviour of this sink depends on the HTTP response returned
+by the target server. The sink backoff/ready status is configurable, as is the
+transaction commit/rollback result and whether the event contributes to the
+successful event drain count.
+
+Any malformed HTTP response returned by the server where the status code is
+not readable will result in a backoff signal and the event is not consumed
+from the channel.
+
+Required properties are in **bold**.
+
+========================== ================= 
===========================================================================================
+Property Name              Default           Description
+========================== ================= 
===========================================================================================
+**channel**                --
+**type**                   --                The component type name, needs to 
be ``http``.
+**endpoint**               --                The fully qualified URL endpoint 
to POST to
+connectTimeout             5000              The socket connection timeout in 
milliseconds
+requestTimeout             5000              The maximum request processing 
time in milliseconds
+contentTypeHeader          text/plain        The HTTP Content-Type header
+acceptHeader               text/plain        The HTTP Accept header value
+defaultBackoff             true              Whether to backoff by default on 
receiving all HTTP status codes
+defaultRollback            true              Whether to rollback by default on 
receiving all HTTP status codes
+defaultIncrementMetrics    false             Whether to increment metrics by 
default on receiving all HTTP status codes
+backoff.CODE               --                Configures a specific backoff for 
an individual (i.e. 200) code or a group (i.e. 2XX) code
+rollback.CODE              --                Configures a specific rollback 
for an individual (i.e. 200) code or a group (i.e. 2XX) code
+incrementMetrics.CODE      --                Configures a specific metrics 
increment for an individual (i.e. 200) code or a group (i.e. 2XX) code
+========================== ================= 
===========================================================================================
+
+Note that the most specific HTTP status code match is used for the backoff,
+rollback and incrementMetrics configuration options. If there are configuration
+values for both 2XX and 200 status codes, then 200 HTTP codes will use the 200
+value, and all other HTTP codes in the 201-299 range will use the 2XX value.
+
+Any empty or null events are consumed without any request being made to the
+HTTP endpoint.
+
+Example for agent named a1:
+
+.. code-block:: properties
+
+  a1.channels = c1
+  a1.sinks = k1
+  a1.sinks.k1.type = http
+  a1.sinks.k1.channel = c1
+  a1.sinks.k1.endpoint = http://localhost:8080/someuri
+  a1.sinks.k1.connectTimeout = 2000
+  a1.sinks.k1.requestTimeout = 2000
+  a1.sinks.k1.acceptHeader = application/json
+  a1.sinks.k1.contentTypeHeader = application/json
+  a1.sinks.k1.defaultBackoff = true
+  a1.sinks.k1.defaultRollback = true
+  a1.sinks.k1.defaultIncrementMetrics = false
+  a1.sinks.k1.backoff.4XX = false
+  a1.sinks.k1.rollback.4XX = false
+  a1.sinks.k1.incrementMetrics.4XX = true
+  a1.sinks.k1.backoff.200 = false
+  a1.sinks.k1.rollback.200 = false
+  a1.sinks.k1.incrementMetrics.200 = true
+
+
 Custom Sink
 ~~~~~~~~~~~
 

http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-sinks/flume-http-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-http-sink/pom.xml 
b/flume-ng-sinks/flume-http-sink/pom.xml
new file mode 100644
index 0000000..60dee65
--- /dev/null
+++ b/flume-ng-sinks/flume-http-sink/pom.xml
@@ -0,0 +1,136 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-ng-sinks</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.8.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume.flume-ng-sinks</groupId>
+  <artifactId>flume-http-sink</artifactId>
+  <name>Flume HTTP/S Sink</name>
+
+  <properties>
+    <wiremock.version>1.53</wiremock.version>
+    <guava.version>18.0</guava.version>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.thrift</groupId>
+          <artifactId>libthrift</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-configuration</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.thrift</groupId>
+          <artifactId>libthrift</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.github.tomakehurst</groupId>
+      <artifactId>wiremock</artifactId>
+      <version>${wiremock.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.3.5</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
 
b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
new file mode 100644
index 0000000..9637326
--- /dev/null
+++ 
b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.http;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implementation of an HTTP sink. Events are POSTed to an HTTP / HTTPS
+ * endpoint. The error handling behaviour is configurable, and can respond
+ * differently depending on the response status returned by the endpoint.
+ *
+ * Rollback of the Flume transaction, and backoff can be specified globally,
+ * then overridden for ranges (or individual) status codes.
+ */
+public class HttpSink extends AbstractSink implements Configurable {
+
+  /** Class logger. */
+  private static final Logger LOG = Logger.getLogger(HttpSink.class);
+
+  /** Lowest valid HTTP status code. */
+  private static final int HTTP_STATUS_CONTINUE = 100;
+
+  /** Default setting for the connection timeout when calling endpoint. */
+  private static final int DEFAULT_CONNECT_TIMEOUT = 5000;
+
+  /** Default setting for the request timeout when calling endpoint. */
+  private static final int DEFAULT_REQUEST_TIMEOUT = 5000;
+
+  /** Default setting for the HTTP content type header. */
+  private static final String DEFAULT_CONTENT_TYPE = "text/plain";
+
+  /** Default setting for the HTTP accept header. */
+  private static final String DEFAULT_ACCEPT_HEADER = "text/plain";
+
+  /** Endpoint URL to POST events to. */
+  private URL endpointUrl;
+
+  /** Counter used to monitor event throughput. */
+  private SinkCounter sinkCounter;
+
+  /** Actual connection timeout value in use. */
+  private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+
+  /** Actual request timeout value in use. */
+  private int requestTimeout = DEFAULT_REQUEST_TIMEOUT;
+
+  /** Actual content type header value in use. */
+  private String contentTypeHeader = DEFAULT_CONTENT_TYPE;
+
+  /** Actual accept header value in use. */
+  private String acceptHeader = DEFAULT_ACCEPT_HEADER;
+
+  /** Backoff value to use if a specific override is not defined. */
+  private boolean defaultBackoff;
+
+  /** Rollback value to use if a specific override is not defined. */
+  private boolean defaultRollback;
+
+  /** Increment metrics value to use if a specific override is not defined. */
+  private boolean defaultIncrementMetrics;
+
+  /**
+   * Holds all overrides for backoff. The key is a string of the format "500" 
or
+   * "5XX", and the value is the backoff value to use for the individual code,
+   * or code range.
+   */
+  private HashMap<String, Boolean> backoffOverrides = new HashMap<>();
+
+  /**
+   * Holds all overrides for rollback. The key is a string of the format "500"
+   * or "5XX", and the value is the rollback value to use for the individual
+   * code, or code range.
+   */
+  private HashMap<String, Boolean> rollbackOverrides = new HashMap<>();
+
+  /**
+   * Holds all overrides for increment metrics. The key is a string of the
+   * format "500" or "5XX", and the value is the increment metrics value to use
+   * for the individual code, or code range.
+   */
+  private HashMap<String, Boolean> incrementMetricsOverrides = new HashMap<>();
+
+  /** Used to create HTTP connections to the endpoint. */
+  private ConnectionBuilder connectionBuilder;
+
+  @Override
+  public final void configure(final Context context) {
+    String configuredEndpoint = context.getString("endpoint", "");
+    LOG.info("Read endpoint URL from configuration : " + configuredEndpoint);
+
+    try {
+      endpointUrl = new URL(configuredEndpoint);
+    } catch (MalformedURLException e) {
+      throw new IllegalArgumentException("Endpoint URL invalid", e);
+    }
+
+    connectTimeout = context.getInteger("connectTimeout",
+        DEFAULT_CONNECT_TIMEOUT);
+
+    if (connectTimeout <= 0) {
+      throw new IllegalArgumentException(
+          "Connect timeout must be a non-zero and positive");
+    }
+    LOG.info("Using connect timeout : " + connectTimeout);
+
+    requestTimeout = context.getInteger("requestTimeout",
+        DEFAULT_REQUEST_TIMEOUT);
+
+    if (requestTimeout <= 0) {
+      throw new IllegalArgumentException(
+          "Request timeout must be a non-zero and positive");
+    }
+    LOG.info("Using request timeout : " + requestTimeout);
+
+    acceptHeader = context.getString("acceptHeader", DEFAULT_ACCEPT_HEADER);
+    LOG.info("Using Accept header value : " + acceptHeader);
+
+    contentTypeHeader = context.getString("contentTypeHeader",
+        DEFAULT_CONTENT_TYPE);
+    LOG.info("Using Content-Type header value : " + contentTypeHeader);
+
+    defaultBackoff = context.getBoolean("defaultBackoff", true);
+    LOG.info("Channel backoff by default is " + defaultBackoff);
+
+    defaultRollback = context.getBoolean("defaultRollback", true);
+    LOG.info("Transaction rollback by default is " + defaultRollback);
+
+    defaultIncrementMetrics = context.getBoolean("defaultIncrementMetrics",
+        false);
+    LOG.info("Incrementing metrics by default is " + defaultIncrementMetrics);
+
+    parseConfigOverrides("backoff", context, backoffOverrides);
+    parseConfigOverrides("rollback", context, rollbackOverrides);
+    parseConfigOverrides("incrementMetrics", context,
+        incrementMetricsOverrides);
+
+    if (this.sinkCounter == null) {
+      this.sinkCounter = new SinkCounter(this.getName());
+    }
+
+    connectionBuilder = new ConnectionBuilder();
+  }
+
+  @Override
+  public final void start() {
+    LOG.info("Starting HttpSink");
+    sinkCounter.start();
+  }
+
+  @Override
+  public final void stop() {
+    LOG.info("Stopping HttpSink");
+    sinkCounter.stop();
+  }
+
+  @Override
+  public final Status process() throws EventDeliveryException {
+    Status status = null;
+    OutputStream outputStream = null;
+
+    Channel ch = getChannel();
+    Transaction txn = ch.getTransaction();
+    txn.begin();
+
+    try {
+      Event event = ch.take();
+
+      byte[] eventBody = null;
+      if (event != null) {
+        eventBody = event.getBody();
+      }
+
+      if (eventBody != null && eventBody.length > 0) {
+        sinkCounter.incrementEventDrainAttemptCount();
+        LOG.debug("Sending request : " + new String(event.getBody()));
+
+        try {
+          HttpURLConnection connection = connectionBuilder.getConnection();
+
+          outputStream = connection.getOutputStream();
+          outputStream.write(eventBody);
+          outputStream.flush();
+          outputStream.close();
+
+          int httpStatusCode = connection.getResponseCode();
+          LOG.debug("Got status code : " + httpStatusCode);
+
+          connection.getInputStream().close();
+          LOG.debug("Response processed and closed");
+
+          if (httpStatusCode >= HTTP_STATUS_CONTINUE) {
+            String httpStatusString = String.valueOf(httpStatusCode);
+
+            boolean shouldRollback = findOverrideValue(httpStatusString,
+                rollbackOverrides, defaultRollback);
+
+            if (shouldRollback) {
+              txn.rollback();
+            } else {
+              txn.commit();
+            }
+
+            boolean shouldBackoff = findOverrideValue(httpStatusString,
+                backoffOverrides, defaultBackoff);
+
+            if (shouldBackoff) {
+              status = Status.BACKOFF;
+            } else {
+              status = Status.READY;
+            }
+
+            boolean shouldIncrementMetrics = 
findOverrideValue(httpStatusString,
+                incrementMetricsOverrides, defaultIncrementMetrics);
+
+            if (shouldIncrementMetrics) {
+              sinkCounter.incrementEventDrainSuccessCount();
+            }
+
+            if (shouldRollback) {
+              if (shouldBackoff) {
+                LOG.info(String.format("Got status code %d from HTTP server."
+                    + " Rolled back event and backed off.", httpStatusCode));
+              } else {
+                LOG.info(String.format("Got status code %d from HTTP server."
+                    + " Rolled back event for retry.", httpStatusCode));
+              }
+            }
+          } else {
+            txn.rollback();
+            status = Status.BACKOFF;
+
+            LOG.warn("Malformed response returned from server, retrying");
+          }
+
+        } catch (IOException e) {
+          txn.rollback();
+          status = Status.BACKOFF;
+
+          LOG.error("Error opening connection, or request timed out", e);
+        }
+
+      } else {
+        txn.commit();
+        status = Status.BACKOFF;
+
+        LOG.warn("Processed empty event");
+      }
+
+    } catch (Throwable t) {
+      txn.rollback();
+      status = Status.BACKOFF;
+
+      LOG.error("Error sending HTTP request, retrying", t);
+
+      // re-throw all Errors
+      if (t instanceof Error) {
+        throw (Error) t;
+      }
+
+    } finally {
+      txn.close();
+
+      if (outputStream != null) {
+        try {
+          outputStream.close();
+        } catch (IOException e) {
+          // ignore errors
+        }
+      }
+    }
+
+    return status;
+  }
+
+  /**
+   * Reads a set of override values from the context configuration and stores
+   * the results in the Map provided.
+   *
+   * @param propertyName  the prefix of the config property names
+   * @param context       the context to use to read config properties
+   * @param override      the override Map to store results in
+   */
+  private void parseConfigOverrides(final String propertyName,
+                                    final Context context,
+                                    final Map<String, Boolean> override) {
+
+    ImmutableMap<String, String> config = context.getSubProperties(
+        propertyName + ".");
+
+    if (config != null) {
+      for (Map.Entry<String, String> value : config.entrySet()) {
+        LOG.info(String.format("Read %s value for status code %s as %s",
+            propertyName, value.getKey(), value.getValue()));
+
+        if (override.containsKey(value.getKey())) {
+          LOG.warn(String.format("Ignoring duplicate config value for %s.%s",
+              propertyName, value.getKey()));
+        } else {
+          override.put(value.getKey(), Boolean.valueOf(value.getValue()));
+        }
+      }
+    }
+  }
+
+  /**
+   * Queries the specified override map to find the most appropriate value. The
+   * most specific match is found.
+   *
+   * @param statusCode    the String representation of the HTTP status code
+   * @param overrides     the map of status code overrides
+   * @param defaultValue  the default value to use if no override is configured
+   *
+   * @return the value of the most specific match to the given status code
+   */
+  private boolean findOverrideValue(final String statusCode,
+                                    final HashMap<String, Boolean> overrides,
+                                    final boolean defaultValue) {
+
+    Boolean overrideValue = overrides.get(statusCode);
+    if (overrideValue == null) {
+      overrideValue = overrides.get(statusCode.substring(0, 1) + "XX");
+      if (overrideValue == null) {
+        overrideValue = defaultValue;
+      }
+    }
+    return overrideValue;
+  }
+
+  /**
+   * Update the connection builder.
+   *
+   * @param builder  the new value
+   */
+  final void setConnectionBuilder(final ConnectionBuilder builder) {
+    this.connectionBuilder = builder;
+  }
+
+  /**
+   * Update the sinkCounter.
+   *
+   * @param newSinkCounter  the new value
+   */
+  final void setSinkCounter(final SinkCounter newSinkCounter) {
+    this.sinkCounter = newSinkCounter;
+  }
+
+  /**
+   * Class used to allow extending the connection building functionality.
+   */
+  class ConnectionBuilder {
+
+    /**
+     * Creates an HTTP connection to the configured endpoint address. This
+     * connection is setup for a POST request, and uses the content type and
+     * accept header values in the configuration.
+     *
+     * @return the connection object
+     * @throws IOException on any connection error
+     */
+    public HttpURLConnection getConnection() throws IOException {
+      HttpURLConnection connection = (HttpURLConnection)
+          endpointUrl.openConnection();
+
+      connection.setRequestMethod("POST");
+      connection.setRequestProperty("Content-Type", contentTypeHeader);
+      connection.setRequestProperty("Accept", acceptHeader);
+      connection.setConnectTimeout(connectTimeout);
+      connection.setReadTimeout(requestTimeout);
+      connection.setDoOutput(true);
+      connection.setDoInput(true);
+      connection.connect();
+      return connection;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/package-info.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/package-info.java
 
b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/package-info.java
new file mode 100644
index 0000000..5b88f01
--- /dev/null
+++ 
b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This package provides an HTTP sink for Flume so that events can be sent out
+ * to a target HTTP endpoint.
+ */
+package org.apache.flume.sink.http;

http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
 
b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
new file mode 100644
index 0000000..16cb6e8
--- /dev/null
+++ 
b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.http;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.Transaction;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestHttpSink {
+
+  private static final Integer DEFAULT_REQUEST_TIMEOUT = 5000;
+  private static final Integer DEFAULT_CONNECT_TIMEOUT = 5000;
+  private static final String DEFAULT_ACCEPT_HEADER = "text/plain";
+  private static final String DEFAULT_CONTENT_TYPE_HEADER = "text/plain";
+
+  @Mock
+  private SinkCounter sinkCounter;
+
+  @Mock
+  private Context configContext;
+
+  @Mock
+  private Channel channel;
+
+  @Mock
+  private Transaction transaction;
+
+  @Mock
+  private Event event;
+
+  @Mock
+  private HttpURLConnection httpURLConnection;
+
+  @Mock
+  private OutputStream outputStream;
+
+  @Mock
+  private InputStream inputStream;
+
+  @Test
+  public void ensureAllConfigurationOptionsRead() {
+    whenDefaultStringConfig();
+    whenDefaultBooleanConfig();
+    when(configContext.getInteger(eq("connectTimeout"), 
Mockito.anyInt())).thenReturn(1000);
+    when(configContext.getInteger(eq("requestTimeout"), 
Mockito.anyInt())).thenReturn(1000);
+
+    new HttpSink().configure(configContext);
+
+    verify(configContext).getString("endpoint", "");
+    verify(configContext).getInteger(eq("connectTimeout"), Mockito.anyInt());
+    verify(configContext).getInteger(eq("requestTimeout"), Mockito.anyInt());
+    verify(configContext).getString(eq("acceptHeader"), Mockito.anyString());
+    verify(configContext).getString(eq("contentTypeHeader"), 
Mockito.anyString());
+    verify(configContext).getBoolean("defaultBackoff", true);
+    verify(configContext).getBoolean("defaultRollback", true);
+    verify(configContext).getBoolean("defaultIncrementMetrics", false);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void ensureExceptionIfEndpointUrlEmpty() {
+    when(configContext.getString("endpoint", "")).thenReturn("");
+    new HttpSink().configure(configContext);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void ensureExceptionIfEndpointUrlInvalid() {
+    when(configContext.getString("endpoint", "")).thenReturn("invalid url");
+    new HttpSink().configure(configContext);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void ensureExceptionIfConnectTimeoutNegative() {
+    whenDefaultStringConfig();
+    when(configContext.getInteger("connectTimeout", 1000)).thenReturn(-1000);
+    when(configContext.getInteger(eq("requestTimeout"), 
Mockito.anyInt())).thenReturn(1000);
+    new HttpSink().configure(configContext);
+  }
+
+  @Test
+  public void ensureDefaultConnectTimeoutCorrect() {
+    whenDefaultStringConfig();
+    when(configContext.getInteger("connectTimeout", 
DEFAULT_CONNECT_TIMEOUT)).thenReturn(1000);
+    when(configContext.getInteger(eq("requestTimeout"), 
Mockito.anyInt())).thenReturn(1000);
+    new HttpSink().configure(configContext);
+    verify(configContext).getInteger("connectTimeout", 
DEFAULT_CONNECT_TIMEOUT);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void ensureExceptionIfRequestTimeoutNegative() {
+    whenDefaultStringConfig();
+    when(configContext.getInteger("requestTimeout", 1000)).thenReturn(-1000);
+    when(configContext.getInteger(eq("connectTimeout"), 
Mockito.anyInt())).thenReturn(1000);
+    new HttpSink().configure(configContext);
+  }
+
+  @Test
+  public void ensureDefaultRequestTimeoutCorrect() {
+    whenDefaultStringConfig();
+    when(configContext.getInteger("requestTimeout", 
DEFAULT_REQUEST_TIMEOUT)).thenReturn(1000);
+    when(configContext.getInteger(eq("connectTimeout"), 
Mockito.anyInt())).thenReturn(1000);
+    new HttpSink().configure(configContext);
+    verify(configContext).getInteger("requestTimeout", 
DEFAULT_REQUEST_TIMEOUT);
+  }
+
+  @Test
+  public void ensureDefaultAcceptHeaderCorrect() {
+    whenDefaultTimeouts();
+    whenDefaultStringConfig();
+    new HttpSink().configure(configContext);
+    verify(configContext).getString("acceptHeader", DEFAULT_ACCEPT_HEADER);
+  }
+
+  @Test
+  public void ensureDefaultContentTypeHeaderCorrect() {
+    whenDefaultTimeouts();
+    whenDefaultStringConfig();
+    new HttpSink().configure(configContext);
+    verify(configContext).getString("contentTypeHeader", 
DEFAULT_CONTENT_TYPE_HEADER);
+  }
+
+  @Test
+  public void ensureBackoffOnNullEvent() throws Exception {
+    when(channel.take()).thenReturn(null);
+    executeWithMocks(true);
+  }
+
+  @Test
+  public void ensureBackoffOnNullEventBody() throws Exception {
+    when(channel.take()).thenReturn(event);
+    when(event.getBody()).thenReturn(null);
+    executeWithMocks(true);
+  }
+
+  @Test
+  public void ensureBackoffOnEmptyEvent() throws Exception {
+    when(channel.take()).thenReturn(event);
+    when(event.getBody()).thenReturn(new byte[]{});
+    executeWithMocks(true);
+  }
+
+  @Test
+  public void ensureRollbackBackoffAndIncrementMetricsIfConfigured() throws 
Exception {
+    when(channel.take()).thenReturn(event);
+    when(event.getBody()).thenReturn("something".getBytes());
+
+    Context context = new Context();
+    context.put("defaultRollback", "true");
+    context.put("defaultBackoff", "true");
+    context.put("defaultIncrementMetrics", "true");
+
+    executeWithMocks(false, Status.BACKOFF, true, true, context, 
HttpURLConnection.HTTP_OK);
+  }
+
+  @Test
+  public void ensureCommitReadyAndNoIncrementMetricsIfConfigured() throws 
Exception {
+    when(channel.take()).thenReturn(event);
+    when(event.getBody()).thenReturn("something".getBytes());
+
+    Context context = new Context();
+    context.put("defaultRollback", "false");
+    context.put("defaultBackoff", "false");
+    context.put("defaultIncrementMetrics", "false");
+
+    executeWithMocks(true, Status.READY, false, false, context, 
HttpURLConnection.HTTP_OK);
+  }
+
+  @Test
+  public void ensureSingleStatusConfigurationCorrectlyUsed() throws Exception {
+    when(channel.take()).thenReturn(event);
+    when(event.getBody()).thenReturn("something".getBytes());
+
+    Context context = new Context();
+    context.put("defaultRollback", "true");
+    context.put("defaultBackoff", "true");
+    context.put("defaultIncrementMetrics", "false");
+    context.put("rollback.200", "false");
+    context.put("backoff.200", "false");
+    context.put("incrementMetrics.200", "true");
+
+    executeWithMocks(true, Status.READY, true, true, context, 
HttpURLConnection.HTTP_OK);
+  }
+
+  @Test
+  public void ensureGroupConfigurationCorrectlyUsed() throws Exception {
+    when(channel.take()).thenReturn(event);
+    when(event.getBody()).thenReturn("something".getBytes());
+
+    Context context = new Context();
+    context.put("defaultRollback", "true");
+    context.put("defaultBackoff", "true");
+    context.put("defaultIncrementMetrics", "false");
+    context.put("rollback.2XX", "false");
+    context.put("backoff.2XX", "false");
+    context.put("incrementMetrics.2XX", "true");
+
+    executeWithMocks(true, Status.READY, true, true, context, 
HttpURLConnection.HTTP_OK);
+    executeWithMocks(true, Status.READY, true, true, context, 
HttpURLConnection.HTTP_NO_CONTENT);
+  }
+
+  @Test
+  public void 
ensureSingleStatusConfigurationOverridesGroupConfigurationCorrectly()
+      throws Exception {
+
+    when(channel.take()).thenReturn(event);
+    when(event.getBody()).thenReturn("something".getBytes());
+
+    Context context = new Context();
+    context.put("rollback.2XX", "false");
+    context.put("backoff.2XX", "false");
+    context.put("incrementMetrics.2XX", "true");
+    context.put("rollback.200", "true");
+    context.put("backoff.200", "true");
+    context.put("incrementMetrics.200", "false");
+
+    executeWithMocks(true, Status.READY, true, true, context, 
HttpURLConnection.HTTP_NO_CONTENT);
+    executeWithMocks(false, Status.BACKOFF, false, true, context, 
HttpURLConnection.HTTP_OK);
+  }
+
+  private void executeWithMocks(boolean commit) throws Exception {
+    Context context = new Context();
+    executeWithMocks(commit, Status.BACKOFF, false, false, context, 
HttpURLConnection.HTTP_OK);
+  }
+
+  private void executeWithMocks(boolean expectedCommit, Status expectedStatus,
+                                boolean expectedIncrementSuccessMetrics,
+                                boolean expectedIncrementAttemptMetrics,
+                                Context context, int httpStatus)
+      throws Exception {
+
+    context.put("endpoint", "http://localhost:8080/endpoint";);
+
+    HttpSink httpSink = new HttpSink();
+    httpSink.configure(context);
+    httpSink.setConnectionBuilder(httpSink.new ConnectionBuilder() {
+      @Override
+      public HttpURLConnection getConnection() throws IOException {
+        return httpURLConnection;
+      }
+    });
+    httpSink.setChannel(channel);
+    httpSink.setSinkCounter(sinkCounter);
+
+    when(channel.getTransaction()).thenReturn(transaction);
+    when(httpURLConnection.getOutputStream()).thenReturn(outputStream);
+    when(httpURLConnection.getInputStream()).thenReturn(inputStream);
+    when(httpURLConnection.getResponseCode()).thenReturn(httpStatus);
+
+    Status actualStatus = httpSink.process();
+
+    assert (actualStatus == expectedStatus);
+
+    inOrder(transaction).verify(transaction).begin();
+
+    if (expectedIncrementAttemptMetrics) {
+      
inOrder(sinkCounter).verify(sinkCounter).incrementEventDrainAttemptCount();
+    }
+
+    if (expectedCommit) {
+      inOrder(transaction).verify(transaction).commit();
+    } else {
+      inOrder(transaction).verify(transaction).rollback();
+    }
+
+    if (expectedIncrementSuccessMetrics) {
+      
inOrder(sinkCounter).verify(sinkCounter).incrementEventDrainSuccessCount();
+    }
+
+    inOrder(transaction).verify(transaction).close();
+  }
+
+  private void whenDefaultStringConfig() {
+    when(configContext.getString("endpoint", 
"")).thenReturn("http://test.abc/";);
+    when(configContext.getString("acceptHeader", 
"")).thenReturn("test/accept");
+    when(configContext.getString("contentTypeHeader", 
"")).thenReturn("test/content");
+  }
+
+  private void whenDefaultBooleanConfig() {
+    when(configContext.getBoolean("defaultBackoff", true)).thenReturn(true);
+    when(configContext.getBoolean("defaultRollback", true)).thenReturn(true);
+    when(configContext.getBoolean("defaultIncrementMetrics", 
false)).thenReturn(true);
+  }
+
+  private void whenDefaultTimeouts() {
+    when(configContext.getInteger(eq("requestTimeout"), 
Mockito.anyInt())).thenReturn(1000);
+    when(configContext.getInteger(eq("connectTimeout"), 
Mockito.anyInt())).thenReturn(1000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java
 
b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java
new file mode 100644
index 0000000..74dcf1d
--- /dev/null
+++ 
b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.http;
+
+import com.github.tomakehurst.wiremock.global.RequestDelaySpec;
+import com.github.tomakehurst.wiremock.http.Fault;
+import com.github.tomakehurst.wiremock.http.Request;
+import com.github.tomakehurst.wiremock.http.RequestListener;
+import com.github.tomakehurst.wiremock.http.Response;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import org.apache.flume.Context;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.event.SimpleEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.*;
+import static 
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
+import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED;
+import static org.apache.flume.Sink.Status;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Runs a set of tests against a mocked HTTP endpoint.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestHttpSinkIT {
+
+  private static final int RESPONSE_TIMEOUT = 4000;
+  private static final int CONNECT_TIMEOUT = 2500;
+
+  private MemoryChannel channel;
+
+  private HttpSink httpSink;
+
+  @Before
+  public void setupSink() {
+    if (httpSink == null) {
+      Context httpSinkContext = new Context();
+      httpSinkContext.put("endpoint", "http://localhost:8080/endpoint";);
+      httpSinkContext.put("requestTimeout", "2000");
+      httpSinkContext.put("connectTimeout", "1500");
+      httpSinkContext.put("acceptHeader", "application/json");
+      httpSinkContext.put("contentTypeHeader", "application/json");
+      httpSinkContext.put("backoff.200", "false");
+      httpSinkContext.put("rollback.200", "false");
+      httpSinkContext.put("incrementMetrics.200", "true");
+
+      Context memoryChannelContext = new Context();
+
+      channel = new MemoryChannel();
+      channel.configure(memoryChannelContext);
+      channel.start();
+
+      httpSink = new HttpSink();
+      httpSink.configure(httpSinkContext);
+      httpSink.setChannel(channel);
+      httpSink.start();
+    }
+  }
+
+  @After
+  public void waitForShutdown() throws InterruptedException {
+    httpSink.stop();
+    new CountDownLatch(1).await(500, TimeUnit.MILLISECONDS);
+  }
+
+  @Rule
+  public WireMockRule service = new WireMockRule(wireMockConfig().port(8080));
+
+  @Test
+  public void ensureSuccessfulMessageDelivery() throws Exception {
+    service.stubFor(post(urlEqualTo("/endpoint"))
+        .withRequestBody(equalToJson(event("SUCCESS")))
+        .willReturn(aResponse().withStatus(200)));
+
+    addEventToChannel(event("SUCCESS"));
+
+    service.verify(1, postRequestedFor(urlEqualTo("/endpoint"))
+        .withRequestBody(equalToJson(event("SUCCESS"))));
+  }
+
+  @Test
+  public void ensureEventsResentOn503Failure() throws Exception {
+    String errorScenario = "Error Scenario";
+
+    service.stubFor(post(urlEqualTo("/endpoint"))
+        .inScenario(errorScenario)
+        .whenScenarioStateIs(STARTED)
+        .withRequestBody(equalToJson(event("TRANSIENT_ERROR")))
+        .willReturn(aResponse().withStatus(503))
+        .willSetStateTo("Error Sent"));
+
+    service.stubFor(post(urlEqualTo("/endpoint"))
+        .inScenario(errorScenario)
+        .whenScenarioStateIs("Error Sent")
+        .withRequestBody(equalToJson(event("TRANSIENT_ERROR")))
+        .willReturn(aResponse().withStatus(200)));
+
+    addEventToChannel(event("TRANSIENT_ERROR"), Status.BACKOFF);
+    addEventToChannel(event("TRANSIENT_ERROR"), Status.READY);
+
+    service.verify(2, postRequestedFor(urlEqualTo("/endpoint"))
+        .withRequestBody(equalToJson(event("TRANSIENT_ERROR"))));
+  }
+
+  @Test
+  public void ensureEventsResentOnNetworkFailure() throws Exception {
+    String errorScenario = "Error Scenario";
+
+    service.stubFor(post(urlEqualTo("/endpoint"))
+        .inScenario(errorScenario)
+        .whenScenarioStateIs(STARTED)
+        .withRequestBody(equalToJson(event("NETWORK_ERROR")))
+        .willReturn(aResponse().withFault(Fault.RANDOM_DATA_THEN_CLOSE))
+        .willSetStateTo("Error Sent"));
+
+    service.stubFor(post(urlEqualTo("/endpoint"))
+        .inScenario(errorScenario)
+        .whenScenarioStateIs("Error Sent")
+        .withRequestBody(equalToJson(event("NETWORK_ERROR")))
+        .willReturn(aResponse().withStatus(200)));
+
+    addEventToChannel(event("NETWORK_ERROR"), Status.BACKOFF);
+    addEventToChannel(event("NETWORK_ERROR"), Status.READY);
+
+    service.verify(2, postRequestedFor(urlEqualTo("/endpoint"))
+        .withRequestBody(equalToJson(event("NETWORK_ERROR"))));
+  }
+
+  @Test
+  public void ensureEventsResentOnConnectionTimeout() throws Exception {
+    final CountDownLatch firstRequestReceived = new CountDownLatch(1);
+
+    service.addSocketAcceptDelay(new RequestDelaySpec(CONNECT_TIMEOUT));
+    service.addMockServiceRequestListener(new RequestListener() {
+      @Override
+      public void requestReceived(Request request, Response response) {
+        service.addSocketAcceptDelay(new RequestDelaySpec(0));
+        firstRequestReceived.countDown();
+      }
+    });
+
+    service.stubFor(post(urlEqualTo("/endpoint"))
+        .withRequestBody(equalToJson(event("SLOW_SOCKET")))
+        .willReturn(aResponse().withStatus(200)));
+
+    addEventToChannel(event("SLOW_SOCKET"), Status.BACKOFF);
+
+    // wait until the socket is connected
+    firstRequestReceived.await(2000, TimeUnit.MILLISECONDS);
+
+    addEventToChannel(event("SLOW_SOCKET"), Status.READY);
+
+    service.verify(2, postRequestedFor(urlEqualTo("/endpoint"))
+        .withRequestBody(equalToJson(event("SLOW_SOCKET"))));
+  }
+
+  @Test
+  public void ensureEventsResentOnRequestTimeout() throws Exception {
+    String errorScenario = "Error Scenario";
+
+    service.stubFor(post(urlEqualTo("/endpoint"))
+        .inScenario(errorScenario)
+        .whenScenarioStateIs(STARTED)
+        .withRequestBody(equalToJson(event("SLOW_RESPONSE")))
+        
.willReturn(aResponse().withFixedDelay(RESPONSE_TIMEOUT).withStatus(200))
+        .willSetStateTo("Slow Response Sent"));
+
+    service.stubFor(post(urlEqualTo("/endpoint"))
+        .inScenario(errorScenario)
+        .whenScenarioStateIs("Slow Response Sent")
+        .withRequestBody(equalToJson(event("SLOW_RESPONSE")))
+        .willReturn(aResponse().withStatus(200)));
+
+    addEventToChannel(event("SLOW_RESPONSE"), Status.BACKOFF);
+    addEventToChannel(event("SLOW_RESPONSE"), Status.READY);
+
+    service.verify(2, postRequestedFor(urlEqualTo("/endpoint"))
+        .withRequestBody(equalToJson(event("SLOW_RESPONSE"))));
+  }
+
+  @Test
+  public void ensureHttpConnectionReusedForSuccessfulRequests() throws 
Exception {
+    // we should only get one delay when establishing a connection
+    service.addSocketAcceptDelay(new RequestDelaySpec(1000));
+
+    service.stubFor(post(urlEqualTo("/endpoint"))
+        .withRequestBody(equalToJson(event("SUCCESS")))
+        .willReturn(aResponse().withStatus(200)));
+
+    long startTime = System.currentTimeMillis();
+
+    addEventToChannel(event("SUCCESS"), Status.READY);
+    addEventToChannel(event("SUCCESS"), Status.READY);
+    addEventToChannel(event("SUCCESS"), Status.READY);
+
+    long endTime = System.currentTimeMillis();
+    assertTrue("Test should have completed faster", endTime - startTime < 
2500);
+
+    service.verify(3, postRequestedFor(urlEqualTo("/endpoint"))
+        .withRequestBody(equalToJson(event("SUCCESS"))));
+  }
+
+  private void addEventToChannel(String line) throws EventDeliveryException {
+    addEventToChannel(line, Status.READY);
+  }
+
+  private void addEventToChannel(String line, Status expectedStatus)
+      throws EventDeliveryException {
+
+    SimpleEvent event = new SimpleEvent();
+    event.setBody(line.getBytes());
+
+    Transaction channelTransaction = channel.getTransaction();
+    channelTransaction.begin();
+    channel.put(event);
+    channelTransaction.commit();
+    channelTransaction.close();
+
+    Sink.Status status = httpSink.process();
+
+    assertEquals(expectedStatus, status);
+  }
+
+  private String event(String id) {
+    return "{'id':'" + id + "'}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-sinks/flume-http-sink/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-http-sink/src/test/resources/log4j.properties 
b/flume-ng-sinks/flume-http-sink/src/test/resources/log4j.properties
new file mode 100644
index 0000000..783022d
--- /dev/null
+++ b/flume-ng-sinks/flume-http-sink/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+log4j.rootLogger = ALL, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+
+log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
+log4j.appender.flume.Hostname = 127.0.0.1
+log4j.appender.flume.Port = 4141
+log4j.appender.flume.layout=org.apache.log4j.PatternLayout

http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-sinks/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml
index b49b74b..9072572 100644
--- a/flume-ng-sinks/pom.xml
+++ b/flume-ng-sinks/pom.xml
@@ -47,6 +47,7 @@ limitations under the License.
     <module>flume-ng-elasticsearch-sink</module>
     <module>flume-ng-morphline-solr-sink</module>
     <module>flume-ng-kafka-sink</module>
+    <module>flume-http-sink</module>
   </modules>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f62c99a..e823e3a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1162,6 +1162,13 @@ limitations under the License.
         <version>1.8.0-SNAPSHOT</version>
       </dependency>
 
+
+      <dependency>
+        <groupId>org.apache.flume.flume-ng-sinks</groupId>
+        <artifactId>flume-http-sink</artifactId>
+        <version>1.8.0-SNAPSHOT</version>
+      </dependency>
+
       <dependency>
         <groupId>org.apache.flume.flume-ng-sinks</groupId>
         <artifactId>flume-irc-sink</artifactId>

Reply via email to