Repository: flume Updated Branches: refs/heads/trunk 5e52ac4ad -> a84491472
Add an HTTP sink This patch adds an HTTP sink. Supports configurable backoff & retry for different HTTP status codes, and instrumentation. This closes #84 Reviewers: Shang Wu, Jeff Holoman, Bessenyei Balázs Donát (Ben Wheeler via Bessenyei Balázs Donát) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a8449147 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a8449147 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a8449147 Branch: refs/heads/trunk Commit: a844914721f68282b6fce2b3b0006603bafd790d Parents: 5e52ac4 Author: Ben Wheeler <[email protected]> Authored: Tue Jan 24 08:08:42 2017 +0000 Committer: Bessenyei Balázs Donát <[email protected]> Committed: Tue Jan 24 08:09:25 2017 +0000 ---------------------------------------------------------------------- .../flume/conf/sink/SinkConfiguration.java | 10 +- .../org/apache/flume/conf/sink/SinkType.java | 9 +- flume-ng-dist/pom.xml | 4 + flume-ng-doc/sphinx/FlumeUserGuide.rst | 68 ++++ flume-ng-sinks/flume-http-sink/pom.xml | 136 +++++++ .../org/apache/flume/sink/http/HttpSink.java | 408 +++++++++++++++++++ .../apache/flume/sink/http/package-info.java | 23 ++ .../apache/flume/sink/http/TestHttpSink.java | 322 +++++++++++++++ .../apache/flume/sink/http/TestHttpSinkIT.java | 256 ++++++++++++ .../src/test/resources/log4j.properties | 28 ++ flume-ng-sinks/pom.xml | 1 + pom.xml | 7 + 12 files changed, 1270 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java index 1acd291..d6f4cbf 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java @@ -153,7 +153,15 @@ public class SinkConfiguration extends ComponentConfiguration { * Hive Sink * @see org.apache.flume.sink.hive.HiveSink */ - HIVE("org.apache.flume.sink.hive.HiveSinkConfiguration"); + HIVE("org.apache.flume.sink.hive.HiveSinkConfiguration"), + + /** + * HTTP Sink + * @see org.apache.flume.sink.http.HttpSink + */ + HTTP("org.apache.flume.sink.http.HttpSinkConfiguration"); + + private final String sinkConfigurationName; http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java index edb9f46..bfae570 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java @@ -105,7 +105,14 @@ public enum SinkType { * Hive Sink * @see org.apache.flume.sink.hive.HiveSink */ - HIVE("org.apache.flume.sink.hive.HiveSink"); + HIVE("org.apache.flume.sink.hive.HiveSink"), + + /** + * HTTP Sink + * + * @see org.apache.flume.sink.http.HttpSink + */ + HTTP("org.apache.flume.sink.http.HttpSink"); private final String sinkClassName; http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml index d684c47..3841467 100644 --- a/flume-ng-dist/pom.xml +++ b/flume-ng-dist/pom.xml @@ -154,6 +154,10 @@ </dependency> <dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-http-sink</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-ng-elasticsearch-sink</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 6bc9dba..afa6625 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2916,6 +2916,74 @@ that the operating system user of the Flume processes has read privileges on the }; +HTTP Sink +~~~~~~~~~ + +Behaviour of this sink is that it will take events from the channel, and +send those events to a remote service using an HTTP POST request. The event +content is sent as the POST body. + +Error handling behaviour of this sink depends on the HTTP response returned +by the target server. The sink backoff/ready status is configurable, as is the +transaction commit/rollback result and whether the event contributes to the +successful event drain count. + +Any malformed HTTP response returned by the server where the status code is +not readable will result in a backoff signal and the event is not consumed +from the channel. + +Required properties are in **bold**. + +========================== ================= =========================================================================================== +Property Name Default Description +========================== ================= =========================================================================================== +**channel** -- +**type** -- The component type name, needs to be ``http``. +**endpoint** -- The fully qualified URL endpoint to POST to +connectTimeout 5000 The socket connection timeout in milliseconds +requestTimeout 5000 The maximum request processing time in milliseconds +contentTypeHeader text/plain The HTTP Content-Type header +acceptHeader text/plain The HTTP Accept header value +defaultBackoff true Whether to backoff by default on receiving all HTTP status codes +defaultRollback true Whether to rollback by default on receiving all HTTP status codes +defaultIncrementMetrics false Whether to increment metrics by default on receiving all HTTP status codes +backoff.CODE -- Configures a specific backoff for an individual (i.e. 200) code or a group (i.e. 2XX) code +rollback.CODE -- Configures a specific rollback for an individual (i.e. 200) code or a group (i.e. 2XX) code +incrementMetrics.CODE -- Configures a specific metrics increment for an individual (i.e. 200) code or a group (i.e. 2XX) code +========================== ================= =========================================================================================== + +Note that the most specific HTTP status code match is used for the backoff, +rollback and incrementMetrics configuration options. If there are configuration +values for both 2XX and 200 status codes, then 200 HTTP codes will use the 200 +value, and all other HTTP codes in the 201-299 range will use the 2XX value. + +Any empty or null events are consumed without any request being made to the +HTTP endpoint. + +Example for agent named a1: + +.. code-block:: properties + + a1.channels = c1 + a1.sinks = k1 + a1.sinks.k1.type = http + a1.sinks.k1.channel = c1 + a1.sinks.k1.endpoint = http://localhost:8080/someuri + a1.sinks.k1.connectTimeout = 2000 + a1.sinks.k1.requestTimeout = 2000 + a1.sinks.k1.acceptHeader = application/json + a1.sinks.k1.contentTypeHeader = application/json + a1.sinks.k1.defaultBackoff = true + a1.sinks.k1.defaultRollback = true + a1.sinks.k1.defaultIncrementMetrics = false + a1.sinks.k1.backoff.4XX = false + a1.sinks.k1.rollback.4XX = false + a1.sinks.k1.incrementMetrics.4XX = true + a1.sinks.k1.backoff.200 = false + a1.sinks.k1.rollback.200 = false + a1.sinks.k1.incrementMetrics.200 = true + + Custom Sink ~~~~~~~~~~~ http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-sinks/flume-http-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-http-sink/pom.xml b/flume-ng-sinks/flume-http-sink/pom.xml new file mode 100644 index 0000000..60dee65 --- /dev/null +++ b/flume-ng-sinks/flume-http-sink/pom.xml @@ -0,0 +1,136 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flume-ng-sinks</artifactId> + <groupId>org.apache.flume</groupId> + <version>1.8.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-http-sink</artifactId> + <name>Flume HTTP/S Sink</name> + + <properties> + <wiremock.version>1.53</wiremock.version> + <guava.version>18.0</guava.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-configuration</artifactId> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.github.tomakehurst</groupId> + <artifactId>wiremock</artifactId> + <version>${wiremock.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.3.5</version> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java new file mode 100644 index 0000000..9637326 --- /dev/null +++ b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.http; + +import com.google.common.collect.ImmutableMap; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Transaction; +import org.apache.flume.conf.Configurable; +import org.apache.flume.instrumentation.SinkCounter; +import org.apache.flume.sink.AbstractSink; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +/** + * Implementation of an HTTP sink. Events are POSTed to an HTTP / HTTPS + * endpoint. The error handling behaviour is configurable, and can respond + * differently depending on the response status returned by the endpoint. + * + * Rollback of the Flume transaction, and backoff can be specified globally, + * then overridden for ranges (or individual) status codes. + */ +public class HttpSink extends AbstractSink implements Configurable { + + /** Class logger. */ + private static final Logger LOG = Logger.getLogger(HttpSink.class); + + /** Lowest valid HTTP status code. */ + private static final int HTTP_STATUS_CONTINUE = 100; + + /** Default setting for the connection timeout when calling endpoint. */ + private static final int DEFAULT_CONNECT_TIMEOUT = 5000; + + /** Default setting for the request timeout when calling endpoint. */ + private static final int DEFAULT_REQUEST_TIMEOUT = 5000; + + /** Default setting for the HTTP content type header. */ + private static final String DEFAULT_CONTENT_TYPE = "text/plain"; + + /** Default setting for the HTTP accept header. */ + private static final String DEFAULT_ACCEPT_HEADER = "text/plain"; + + /** Endpoint URL to POST events to. */ + private URL endpointUrl; + + /** Counter used to monitor event throughput. */ + private SinkCounter sinkCounter; + + /** Actual connection timeout value in use. */ + private int connectTimeout = DEFAULT_CONNECT_TIMEOUT; + + /** Actual request timeout value in use. */ + private int requestTimeout = DEFAULT_REQUEST_TIMEOUT; + + /** Actual content type header value in use. */ + private String contentTypeHeader = DEFAULT_CONTENT_TYPE; + + /** Actual accept header value in use. */ + private String acceptHeader = DEFAULT_ACCEPT_HEADER; + + /** Backoff value to use if a specific override is not defined. */ + private boolean defaultBackoff; + + /** Rollback value to use if a specific override is not defined. */ + private boolean defaultRollback; + + /** Increment metrics value to use if a specific override is not defined. */ + private boolean defaultIncrementMetrics; + + /** + * Holds all overrides for backoff. The key is a string of the format "500" or + * "5XX", and the value is the backoff value to use for the individual code, + * or code range. + */ + private HashMap<String, Boolean> backoffOverrides = new HashMap<>(); + + /** + * Holds all overrides for rollback. The key is a string of the format "500" + * or "5XX", and the value is the rollback value to use for the individual + * code, or code range. + */ + private HashMap<String, Boolean> rollbackOverrides = new HashMap<>(); + + /** + * Holds all overrides for increment metrics. The key is a string of the + * format "500" or "5XX", and the value is the increment metrics value to use + * for the individual code, or code range. + */ + private HashMap<String, Boolean> incrementMetricsOverrides = new HashMap<>(); + + /** Used to create HTTP connections to the endpoint. */ + private ConnectionBuilder connectionBuilder; + + @Override + public final void configure(final Context context) { + String configuredEndpoint = context.getString("endpoint", ""); + LOG.info("Read endpoint URL from configuration : " + configuredEndpoint); + + try { + endpointUrl = new URL(configuredEndpoint); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Endpoint URL invalid", e); + } + + connectTimeout = context.getInteger("connectTimeout", + DEFAULT_CONNECT_TIMEOUT); + + if (connectTimeout <= 0) { + throw new IllegalArgumentException( + "Connect timeout must be a non-zero and positive"); + } + LOG.info("Using connect timeout : " + connectTimeout); + + requestTimeout = context.getInteger("requestTimeout", + DEFAULT_REQUEST_TIMEOUT); + + if (requestTimeout <= 0) { + throw new IllegalArgumentException( + "Request timeout must be a non-zero and positive"); + } + LOG.info("Using request timeout : " + requestTimeout); + + acceptHeader = context.getString("acceptHeader", DEFAULT_ACCEPT_HEADER); + LOG.info("Using Accept header value : " + acceptHeader); + + contentTypeHeader = context.getString("contentTypeHeader", + DEFAULT_CONTENT_TYPE); + LOG.info("Using Content-Type header value : " + contentTypeHeader); + + defaultBackoff = context.getBoolean("defaultBackoff", true); + LOG.info("Channel backoff by default is " + defaultBackoff); + + defaultRollback = context.getBoolean("defaultRollback", true); + LOG.info("Transaction rollback by default is " + defaultRollback); + + defaultIncrementMetrics = context.getBoolean("defaultIncrementMetrics", + false); + LOG.info("Incrementing metrics by default is " + defaultIncrementMetrics); + + parseConfigOverrides("backoff", context, backoffOverrides); + parseConfigOverrides("rollback", context, rollbackOverrides); + parseConfigOverrides("incrementMetrics", context, + incrementMetricsOverrides); + + if (this.sinkCounter == null) { + this.sinkCounter = new SinkCounter(this.getName()); + } + + connectionBuilder = new ConnectionBuilder(); + } + + @Override + public final void start() { + LOG.info("Starting HttpSink"); + sinkCounter.start(); + } + + @Override + public final void stop() { + LOG.info("Stopping HttpSink"); + sinkCounter.stop(); + } + + @Override + public final Status process() throws EventDeliveryException { + Status status = null; + OutputStream outputStream = null; + + Channel ch = getChannel(); + Transaction txn = ch.getTransaction(); + txn.begin(); + + try { + Event event = ch.take(); + + byte[] eventBody = null; + if (event != null) { + eventBody = event.getBody(); + } + + if (eventBody != null && eventBody.length > 0) { + sinkCounter.incrementEventDrainAttemptCount(); + LOG.debug("Sending request : " + new String(event.getBody())); + + try { + HttpURLConnection connection = connectionBuilder.getConnection(); + + outputStream = connection.getOutputStream(); + outputStream.write(eventBody); + outputStream.flush(); + outputStream.close(); + + int httpStatusCode = connection.getResponseCode(); + LOG.debug("Got status code : " + httpStatusCode); + + connection.getInputStream().close(); + LOG.debug("Response processed and closed"); + + if (httpStatusCode >= HTTP_STATUS_CONTINUE) { + String httpStatusString = String.valueOf(httpStatusCode); + + boolean shouldRollback = findOverrideValue(httpStatusString, + rollbackOverrides, defaultRollback); + + if (shouldRollback) { + txn.rollback(); + } else { + txn.commit(); + } + + boolean shouldBackoff = findOverrideValue(httpStatusString, + backoffOverrides, defaultBackoff); + + if (shouldBackoff) { + status = Status.BACKOFF; + } else { + status = Status.READY; + } + + boolean shouldIncrementMetrics = findOverrideValue(httpStatusString, + incrementMetricsOverrides, defaultIncrementMetrics); + + if (shouldIncrementMetrics) { + sinkCounter.incrementEventDrainSuccessCount(); + } + + if (shouldRollback) { + if (shouldBackoff) { + LOG.info(String.format("Got status code %d from HTTP server." + + " Rolled back event and backed off.", httpStatusCode)); + } else { + LOG.info(String.format("Got status code %d from HTTP server." + + " Rolled back event for retry.", httpStatusCode)); + } + } + } else { + txn.rollback(); + status = Status.BACKOFF; + + LOG.warn("Malformed response returned from server, retrying"); + } + + } catch (IOException e) { + txn.rollback(); + status = Status.BACKOFF; + + LOG.error("Error opening connection, or request timed out", e); + } + + } else { + txn.commit(); + status = Status.BACKOFF; + + LOG.warn("Processed empty event"); + } + + } catch (Throwable t) { + txn.rollback(); + status = Status.BACKOFF; + + LOG.error("Error sending HTTP request, retrying", t); + + // re-throw all Errors + if (t instanceof Error) { + throw (Error) t; + } + + } finally { + txn.close(); + + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException e) { + // ignore errors + } + } + } + + return status; + } + + /** + * Reads a set of override values from the context configuration and stores + * the results in the Map provided. + * + * @param propertyName the prefix of the config property names + * @param context the context to use to read config properties + * @param override the override Map to store results in + */ + private void parseConfigOverrides(final String propertyName, + final Context context, + final Map<String, Boolean> override) { + + ImmutableMap<String, String> config = context.getSubProperties( + propertyName + "."); + + if (config != null) { + for (Map.Entry<String, String> value : config.entrySet()) { + LOG.info(String.format("Read %s value for status code %s as %s", + propertyName, value.getKey(), value.getValue())); + + if (override.containsKey(value.getKey())) { + LOG.warn(String.format("Ignoring duplicate config value for %s.%s", + propertyName, value.getKey())); + } else { + override.put(value.getKey(), Boolean.valueOf(value.getValue())); + } + } + } + } + + /** + * Queries the specified override map to find the most appropriate value. The + * most specific match is found. + * + * @param statusCode the String representation of the HTTP status code + * @param overrides the map of status code overrides + * @param defaultValue the default value to use if no override is configured + * + * @return the value of the most specific match to the given status code + */ + private boolean findOverrideValue(final String statusCode, + final HashMap<String, Boolean> overrides, + final boolean defaultValue) { + + Boolean overrideValue = overrides.get(statusCode); + if (overrideValue == null) { + overrideValue = overrides.get(statusCode.substring(0, 1) + "XX"); + if (overrideValue == null) { + overrideValue = defaultValue; + } + } + return overrideValue; + } + + /** + * Update the connection builder. + * + * @param builder the new value + */ + final void setConnectionBuilder(final ConnectionBuilder builder) { + this.connectionBuilder = builder; + } + + /** + * Update the sinkCounter. + * + * @param newSinkCounter the new value + */ + final void setSinkCounter(final SinkCounter newSinkCounter) { + this.sinkCounter = newSinkCounter; + } + + /** + * Class used to allow extending the connection building functionality. + */ + class ConnectionBuilder { + + /** + * Creates an HTTP connection to the configured endpoint address. This + * connection is setup for a POST request, and uses the content type and + * accept header values in the configuration. + * + * @return the connection object + * @throws IOException on any connection error + */ + public HttpURLConnection getConnection() throws IOException { + HttpURLConnection connection = (HttpURLConnection) + endpointUrl.openConnection(); + + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", contentTypeHeader); + connection.setRequestProperty("Accept", acceptHeader); + connection.setConnectTimeout(connectTimeout); + connection.setReadTimeout(requestTimeout); + connection.setDoOutput(true); + connection.setDoInput(true); + connection.connect(); + return connection; + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/package-info.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/package-info.java b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/package-info.java new file mode 100644 index 0000000..5b88f01 --- /dev/null +++ b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * This package provides an HTTP sink for Flume so that events can be sent out + * to a target HTTP endpoint. + */ +package org.apache.flume.sink.http; http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java new file mode 100644 index 0000000..16cb6e8 --- /dev/null +++ b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.http; + +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Sink.Status; +import org.apache.flume.Transaction; +import org.apache.flume.instrumentation.SinkCounter; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; + +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TestHttpSink { + + private static final Integer DEFAULT_REQUEST_TIMEOUT = 5000; + private static final Integer DEFAULT_CONNECT_TIMEOUT = 5000; + private static final String DEFAULT_ACCEPT_HEADER = "text/plain"; + private static final String DEFAULT_CONTENT_TYPE_HEADER = "text/plain"; + + @Mock + private SinkCounter sinkCounter; + + @Mock + private Context configContext; + + @Mock + private Channel channel; + + @Mock + private Transaction transaction; + + @Mock + private Event event; + + @Mock + private HttpURLConnection httpURLConnection; + + @Mock + private OutputStream outputStream; + + @Mock + private InputStream inputStream; + + @Test + public void ensureAllConfigurationOptionsRead() { + whenDefaultStringConfig(); + whenDefaultBooleanConfig(); + when(configContext.getInteger(eq("connectTimeout"), Mockito.anyInt())).thenReturn(1000); + when(configContext.getInteger(eq("requestTimeout"), Mockito.anyInt())).thenReturn(1000); + + new HttpSink().configure(configContext); + + verify(configContext).getString("endpoint", ""); + verify(configContext).getInteger(eq("connectTimeout"), Mockito.anyInt()); + verify(configContext).getInteger(eq("requestTimeout"), Mockito.anyInt()); + verify(configContext).getString(eq("acceptHeader"), Mockito.anyString()); + verify(configContext).getString(eq("contentTypeHeader"), Mockito.anyString()); + verify(configContext).getBoolean("defaultBackoff", true); + verify(configContext).getBoolean("defaultRollback", true); + verify(configContext).getBoolean("defaultIncrementMetrics", false); + } + + @Test(expected = IllegalArgumentException.class) + public void ensureExceptionIfEndpointUrlEmpty() { + when(configContext.getString("endpoint", "")).thenReturn(""); + new HttpSink().configure(configContext); + } + + @Test(expected = IllegalArgumentException.class) + public void ensureExceptionIfEndpointUrlInvalid() { + when(configContext.getString("endpoint", "")).thenReturn("invalid url"); + new HttpSink().configure(configContext); + } + + @Test(expected = IllegalArgumentException.class) + public void ensureExceptionIfConnectTimeoutNegative() { + whenDefaultStringConfig(); + when(configContext.getInteger("connectTimeout", 1000)).thenReturn(-1000); + when(configContext.getInteger(eq("requestTimeout"), Mockito.anyInt())).thenReturn(1000); + new HttpSink().configure(configContext); + } + + @Test + public void ensureDefaultConnectTimeoutCorrect() { + whenDefaultStringConfig(); + when(configContext.getInteger("connectTimeout", DEFAULT_CONNECT_TIMEOUT)).thenReturn(1000); + when(configContext.getInteger(eq("requestTimeout"), Mockito.anyInt())).thenReturn(1000); + new HttpSink().configure(configContext); + verify(configContext).getInteger("connectTimeout", DEFAULT_CONNECT_TIMEOUT); + } + + @Test(expected = IllegalArgumentException.class) + public void ensureExceptionIfRequestTimeoutNegative() { + whenDefaultStringConfig(); + when(configContext.getInteger("requestTimeout", 1000)).thenReturn(-1000); + when(configContext.getInteger(eq("connectTimeout"), Mockito.anyInt())).thenReturn(1000); + new HttpSink().configure(configContext); + } + + @Test + public void ensureDefaultRequestTimeoutCorrect() { + whenDefaultStringConfig(); + when(configContext.getInteger("requestTimeout", DEFAULT_REQUEST_TIMEOUT)).thenReturn(1000); + when(configContext.getInteger(eq("connectTimeout"), Mockito.anyInt())).thenReturn(1000); + new HttpSink().configure(configContext); + verify(configContext).getInteger("requestTimeout", DEFAULT_REQUEST_TIMEOUT); + } + + @Test + public void ensureDefaultAcceptHeaderCorrect() { + whenDefaultTimeouts(); + whenDefaultStringConfig(); + new HttpSink().configure(configContext); + verify(configContext).getString("acceptHeader", DEFAULT_ACCEPT_HEADER); + } + + @Test + public void ensureDefaultContentTypeHeaderCorrect() { + whenDefaultTimeouts(); + whenDefaultStringConfig(); + new HttpSink().configure(configContext); + verify(configContext).getString("contentTypeHeader", DEFAULT_CONTENT_TYPE_HEADER); + } + + @Test + public void ensureBackoffOnNullEvent() throws Exception { + when(channel.take()).thenReturn(null); + executeWithMocks(true); + } + + @Test + public void ensureBackoffOnNullEventBody() throws Exception { + when(channel.take()).thenReturn(event); + when(event.getBody()).thenReturn(null); + executeWithMocks(true); + } + + @Test + public void ensureBackoffOnEmptyEvent() throws Exception { + when(channel.take()).thenReturn(event); + when(event.getBody()).thenReturn(new byte[]{}); + executeWithMocks(true); + } + + @Test + public void ensureRollbackBackoffAndIncrementMetricsIfConfigured() throws Exception { + when(channel.take()).thenReturn(event); + when(event.getBody()).thenReturn("something".getBytes()); + + Context context = new Context(); + context.put("defaultRollback", "true"); + context.put("defaultBackoff", "true"); + context.put("defaultIncrementMetrics", "true"); + + executeWithMocks(false, Status.BACKOFF, true, true, context, HttpURLConnection.HTTP_OK); + } + + @Test + public void ensureCommitReadyAndNoIncrementMetricsIfConfigured() throws Exception { + when(channel.take()).thenReturn(event); + when(event.getBody()).thenReturn("something".getBytes()); + + Context context = new Context(); + context.put("defaultRollback", "false"); + context.put("defaultBackoff", "false"); + context.put("defaultIncrementMetrics", "false"); + + executeWithMocks(true, Status.READY, false, false, context, HttpURLConnection.HTTP_OK); + } + + @Test + public void ensureSingleStatusConfigurationCorrectlyUsed() throws Exception { + when(channel.take()).thenReturn(event); + when(event.getBody()).thenReturn("something".getBytes()); + + Context context = new Context(); + context.put("defaultRollback", "true"); + context.put("defaultBackoff", "true"); + context.put("defaultIncrementMetrics", "false"); + context.put("rollback.200", "false"); + context.put("backoff.200", "false"); + context.put("incrementMetrics.200", "true"); + + executeWithMocks(true, Status.READY, true, true, context, HttpURLConnection.HTTP_OK); + } + + @Test + public void ensureGroupConfigurationCorrectlyUsed() throws Exception { + when(channel.take()).thenReturn(event); + when(event.getBody()).thenReturn("something".getBytes()); + + Context context = new Context(); + context.put("defaultRollback", "true"); + context.put("defaultBackoff", "true"); + context.put("defaultIncrementMetrics", "false"); + context.put("rollback.2XX", "false"); + context.put("backoff.2XX", "false"); + context.put("incrementMetrics.2XX", "true"); + + executeWithMocks(true, Status.READY, true, true, context, HttpURLConnection.HTTP_OK); + executeWithMocks(true, Status.READY, true, true, context, HttpURLConnection.HTTP_NO_CONTENT); + } + + @Test + public void ensureSingleStatusConfigurationOverridesGroupConfigurationCorrectly() + throws Exception { + + when(channel.take()).thenReturn(event); + when(event.getBody()).thenReturn("something".getBytes()); + + Context context = new Context(); + context.put("rollback.2XX", "false"); + context.put("backoff.2XX", "false"); + context.put("incrementMetrics.2XX", "true"); + context.put("rollback.200", "true"); + context.put("backoff.200", "true"); + context.put("incrementMetrics.200", "false"); + + executeWithMocks(true, Status.READY, true, true, context, HttpURLConnection.HTTP_NO_CONTENT); + executeWithMocks(false, Status.BACKOFF, false, true, context, HttpURLConnection.HTTP_OK); + } + + private void executeWithMocks(boolean commit) throws Exception { + Context context = new Context(); + executeWithMocks(commit, Status.BACKOFF, false, false, context, HttpURLConnection.HTTP_OK); + } + + private void executeWithMocks(boolean expectedCommit, Status expectedStatus, + boolean expectedIncrementSuccessMetrics, + boolean expectedIncrementAttemptMetrics, + Context context, int httpStatus) + throws Exception { + + context.put("endpoint", "http://localhost:8080/endpoint"); + + HttpSink httpSink = new HttpSink(); + httpSink.configure(context); + httpSink.setConnectionBuilder(httpSink.new ConnectionBuilder() { + @Override + public HttpURLConnection getConnection() throws IOException { + return httpURLConnection; + } + }); + httpSink.setChannel(channel); + httpSink.setSinkCounter(sinkCounter); + + when(channel.getTransaction()).thenReturn(transaction); + when(httpURLConnection.getOutputStream()).thenReturn(outputStream); + when(httpURLConnection.getInputStream()).thenReturn(inputStream); + when(httpURLConnection.getResponseCode()).thenReturn(httpStatus); + + Status actualStatus = httpSink.process(); + + assert (actualStatus == expectedStatus); + + inOrder(transaction).verify(transaction).begin(); + + if (expectedIncrementAttemptMetrics) { + inOrder(sinkCounter).verify(sinkCounter).incrementEventDrainAttemptCount(); + } + + if (expectedCommit) { + inOrder(transaction).verify(transaction).commit(); + } else { + inOrder(transaction).verify(transaction).rollback(); + } + + if (expectedIncrementSuccessMetrics) { + inOrder(sinkCounter).verify(sinkCounter).incrementEventDrainSuccessCount(); + } + + inOrder(transaction).verify(transaction).close(); + } + + private void whenDefaultStringConfig() { + when(configContext.getString("endpoint", "")).thenReturn("http://test.abc/"); + when(configContext.getString("acceptHeader", "")).thenReturn("test/accept"); + when(configContext.getString("contentTypeHeader", "")).thenReturn("test/content"); + } + + private void whenDefaultBooleanConfig() { + when(configContext.getBoolean("defaultBackoff", true)).thenReturn(true); + when(configContext.getBoolean("defaultRollback", true)).thenReturn(true); + when(configContext.getBoolean("defaultIncrementMetrics", false)).thenReturn(true); + } + + private void whenDefaultTimeouts() { + when(configContext.getInteger(eq("requestTimeout"), Mockito.anyInt())).thenReturn(1000); + when(configContext.getInteger(eq("connectTimeout"), Mockito.anyInt())).thenReturn(1000); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java new file mode 100644 index 0000000..74dcf1d --- /dev/null +++ b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.http; + +import com.github.tomakehurst.wiremock.global.RequestDelaySpec; +import com.github.tomakehurst.wiremock.http.Fault; +import com.github.tomakehurst.wiremock.http.Request; +import com.github.tomakehurst.wiremock.http.RequestListener; +import com.github.tomakehurst.wiremock.http.Response; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import org.apache.flume.Context; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Sink; +import org.apache.flume.Transaction; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.event.SimpleEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; +import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; +import static org.apache.flume.Sink.Status; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Runs a set of tests against a mocked HTTP endpoint. + */ +@RunWith(MockitoJUnitRunner.class) +public class TestHttpSinkIT { + + private static final int RESPONSE_TIMEOUT = 4000; + private static final int CONNECT_TIMEOUT = 2500; + + private MemoryChannel channel; + + private HttpSink httpSink; + + @Before + public void setupSink() { + if (httpSink == null) { + Context httpSinkContext = new Context(); + httpSinkContext.put("endpoint", "http://localhost:8080/endpoint"); + httpSinkContext.put("requestTimeout", "2000"); + httpSinkContext.put("connectTimeout", "1500"); + httpSinkContext.put("acceptHeader", "application/json"); + httpSinkContext.put("contentTypeHeader", "application/json"); + httpSinkContext.put("backoff.200", "false"); + httpSinkContext.put("rollback.200", "false"); + httpSinkContext.put("incrementMetrics.200", "true"); + + Context memoryChannelContext = new Context(); + + channel = new MemoryChannel(); + channel.configure(memoryChannelContext); + channel.start(); + + httpSink = new HttpSink(); + httpSink.configure(httpSinkContext); + httpSink.setChannel(channel); + httpSink.start(); + } + } + + @After + public void waitForShutdown() throws InterruptedException { + httpSink.stop(); + new CountDownLatch(1).await(500, TimeUnit.MILLISECONDS); + } + + @Rule + public WireMockRule service = new WireMockRule(wireMockConfig().port(8080)); + + @Test + public void ensureSuccessfulMessageDelivery() throws Exception { + service.stubFor(post(urlEqualTo("/endpoint")) + .withRequestBody(equalToJson(event("SUCCESS"))) + .willReturn(aResponse().withStatus(200))); + + addEventToChannel(event("SUCCESS")); + + service.verify(1, postRequestedFor(urlEqualTo("/endpoint")) + .withRequestBody(equalToJson(event("SUCCESS")))); + } + + @Test + public void ensureEventsResentOn503Failure() throws Exception { + String errorScenario = "Error Scenario"; + + service.stubFor(post(urlEqualTo("/endpoint")) + .inScenario(errorScenario) + .whenScenarioStateIs(STARTED) + .withRequestBody(equalToJson(event("TRANSIENT_ERROR"))) + .willReturn(aResponse().withStatus(503)) + .willSetStateTo("Error Sent")); + + service.stubFor(post(urlEqualTo("/endpoint")) + .inScenario(errorScenario) + .whenScenarioStateIs("Error Sent") + .withRequestBody(equalToJson(event("TRANSIENT_ERROR"))) + .willReturn(aResponse().withStatus(200))); + + addEventToChannel(event("TRANSIENT_ERROR"), Status.BACKOFF); + addEventToChannel(event("TRANSIENT_ERROR"), Status.READY); + + service.verify(2, postRequestedFor(urlEqualTo("/endpoint")) + .withRequestBody(equalToJson(event("TRANSIENT_ERROR")))); + } + + @Test + public void ensureEventsResentOnNetworkFailure() throws Exception { + String errorScenario = "Error Scenario"; + + service.stubFor(post(urlEqualTo("/endpoint")) + .inScenario(errorScenario) + .whenScenarioStateIs(STARTED) + .withRequestBody(equalToJson(event("NETWORK_ERROR"))) + .willReturn(aResponse().withFault(Fault.RANDOM_DATA_THEN_CLOSE)) + .willSetStateTo("Error Sent")); + + service.stubFor(post(urlEqualTo("/endpoint")) + .inScenario(errorScenario) + .whenScenarioStateIs("Error Sent") + .withRequestBody(equalToJson(event("NETWORK_ERROR"))) + .willReturn(aResponse().withStatus(200))); + + addEventToChannel(event("NETWORK_ERROR"), Status.BACKOFF); + addEventToChannel(event("NETWORK_ERROR"), Status.READY); + + service.verify(2, postRequestedFor(urlEqualTo("/endpoint")) + .withRequestBody(equalToJson(event("NETWORK_ERROR")))); + } + + @Test + public void ensureEventsResentOnConnectionTimeout() throws Exception { + final CountDownLatch firstRequestReceived = new CountDownLatch(1); + + service.addSocketAcceptDelay(new RequestDelaySpec(CONNECT_TIMEOUT)); + service.addMockServiceRequestListener(new RequestListener() { + @Override + public void requestReceived(Request request, Response response) { + service.addSocketAcceptDelay(new RequestDelaySpec(0)); + firstRequestReceived.countDown(); + } + }); + + service.stubFor(post(urlEqualTo("/endpoint")) + .withRequestBody(equalToJson(event("SLOW_SOCKET"))) + .willReturn(aResponse().withStatus(200))); + + addEventToChannel(event("SLOW_SOCKET"), Status.BACKOFF); + + // wait until the socket is connected + firstRequestReceived.await(2000, TimeUnit.MILLISECONDS); + + addEventToChannel(event("SLOW_SOCKET"), Status.READY); + + service.verify(2, postRequestedFor(urlEqualTo("/endpoint")) + .withRequestBody(equalToJson(event("SLOW_SOCKET")))); + } + + @Test + public void ensureEventsResentOnRequestTimeout() throws Exception { + String errorScenario = "Error Scenario"; + + service.stubFor(post(urlEqualTo("/endpoint")) + .inScenario(errorScenario) + .whenScenarioStateIs(STARTED) + .withRequestBody(equalToJson(event("SLOW_RESPONSE"))) + .willReturn(aResponse().withFixedDelay(RESPONSE_TIMEOUT).withStatus(200)) + .willSetStateTo("Slow Response Sent")); + + service.stubFor(post(urlEqualTo("/endpoint")) + .inScenario(errorScenario) + .whenScenarioStateIs("Slow Response Sent") + .withRequestBody(equalToJson(event("SLOW_RESPONSE"))) + .willReturn(aResponse().withStatus(200))); + + addEventToChannel(event("SLOW_RESPONSE"), Status.BACKOFF); + addEventToChannel(event("SLOW_RESPONSE"), Status.READY); + + service.verify(2, postRequestedFor(urlEqualTo("/endpoint")) + .withRequestBody(equalToJson(event("SLOW_RESPONSE")))); + } + + @Test + public void ensureHttpConnectionReusedForSuccessfulRequests() throws Exception { + // we should only get one delay when establishing a connection + service.addSocketAcceptDelay(new RequestDelaySpec(1000)); + + service.stubFor(post(urlEqualTo("/endpoint")) + .withRequestBody(equalToJson(event("SUCCESS"))) + .willReturn(aResponse().withStatus(200))); + + long startTime = System.currentTimeMillis(); + + addEventToChannel(event("SUCCESS"), Status.READY); + addEventToChannel(event("SUCCESS"), Status.READY); + addEventToChannel(event("SUCCESS"), Status.READY); + + long endTime = System.currentTimeMillis(); + assertTrue("Test should have completed faster", endTime - startTime < 2500); + + service.verify(3, postRequestedFor(urlEqualTo("/endpoint")) + .withRequestBody(equalToJson(event("SUCCESS")))); + } + + private void addEventToChannel(String line) throws EventDeliveryException { + addEventToChannel(line, Status.READY); + } + + private void addEventToChannel(String line, Status expectedStatus) + throws EventDeliveryException { + + SimpleEvent event = new SimpleEvent(); + event.setBody(line.getBytes()); + + Transaction channelTransaction = channel.getTransaction(); + channelTransaction.begin(); + channel.put(event); + channelTransaction.commit(); + channelTransaction.close(); + + Sink.Status status = httpSink.process(); + + assertEquals(expectedStatus, status); + } + + private String event(String id) { + return "{'id':'" + id + "'}"; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-sinks/flume-http-sink/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-http-sink/src/test/resources/log4j.properties b/flume-ng-sinks/flume-http-sink/src/test/resources/log4j.properties new file mode 100644 index 0000000..783022d --- /dev/null +++ b/flume-ng-sinks/flume-http-sink/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +log4j.rootLogger = ALL, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout + +log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender +log4j.appender.flume.Hostname = 127.0.0.1 +log4j.appender.flume.Port = 4141 +log4j.appender.flume.layout=org.apache.log4j.PatternLayout http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/flume-ng-sinks/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml index b49b74b..9072572 100644 --- a/flume-ng-sinks/pom.xml +++ b/flume-ng-sinks/pom.xml @@ -47,6 +47,7 @@ limitations under the License. <module>flume-ng-elasticsearch-sink</module> <module>flume-ng-morphline-solr-sink</module> <module>flume-ng-kafka-sink</module> + <module>flume-http-sink</module> </modules> <profiles> http://git-wip-us.apache.org/repos/asf/flume/blob/a8449147/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f62c99a..e823e3a 100644 --- a/pom.xml +++ b/pom.xml @@ -1162,6 +1162,13 @@ limitations under the License. <version>1.8.0-SNAPSHOT</version> </dependency> + + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-http-sink</artifactId> + <version>1.8.0-SNAPSHOT</version> + </dependency> + <dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-irc-sink</artifactId>
