Author: rgoers
Date: Sun Aug 19 23:07:48 2012
New Revision: 1374875
URL: http://svn.apache.org/viewvc?rev=1374875&view=rev
Log:
Fix Embedded Agent properties. Document embedded Flume agents
Modified:
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml
Modified:
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java
URL:
http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java?rev=1374875&r1=1374874&r2=1374875&view=diff
==============================================================================
---
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java
(original)
+++
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java
Sun Aug 19 23:07:48 2012
@@ -56,6 +56,10 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
/**
* See Flume's PropertiesFileConfigurationProvider. This class would extend
that if it were possible.
@@ -71,31 +75,36 @@ public class FlumeConfigurationBuilder {
public NodeConfiguration load(String name, Properties props,
NodeConfigurationAware configurationAware) {
NodeConfiguration conf = new SimpleNodeConfiguration();
- FlumeConfiguration fconfig = new FlumeConfiguration(props);
- List<FlumeConfigurationError> errors =
fconfig.getConfigurationErrors();
- if (errors.size() > 0) {
- boolean isError = false;
- for (FlumeConfigurationError error : errors) {
- StringBuilder sb = new StringBuilder();
- sb.append(error.getComponentName()).append("
").append(error.getKey()).append(" ");
- sb.append(error.getErrorType().name()).append(" -
").append(error.getErrorType().getError());
- switch (error.getErrorOrWarning()) {
- case ERROR:
- isError = true;
- LOGGER.error(sb.toString());
- break;
- case WARNING:
- LOGGER.warn(sb.toString());
- break;
+ FlumeConfiguration fconfig;
+ try {
+ fconfig = new FlumeConfiguration(props);
+ List<FlumeConfigurationError> errors =
fconfig.getConfigurationErrors();
+ if (errors.size() > 0) {
+ boolean isError = false;
+ for (FlumeConfigurationError error : errors) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Component:
").append(error.getComponentName()).append(" ");
+ sb.append("Key: ").append(error.getKey()).append(" ");
+ sb.append(error.getErrorType().name()).append(" -
").append(error.getErrorType().getError());
+ switch (error.getErrorOrWarning()) {
+ case ERROR:
+ isError = true;
+ LOGGER.error(sb.toString());
+ break;
+ case WARNING:
+ LOGGER.warn(sb.toString());
+ break;
+ }
}
- }
- if (isError) {
- for (String key : props.stringPropertyNames()) {
- LOGGER.error(key + "=" + props.getProperty(key));
+ if (isError) {
+ throw new ConfigurationException("Unable to configure
Flume due to errors");
}
- throw new ConfigurationException("Unable to configure Flume
due to errors");
}
+ } catch (RuntimeException ex) {
+ printProps(props);
+ throw ex;
}
+
FlumeConfiguration.AgentConfiguration agentConf =
fconfig.getConfigurationFor(name);
if (agentConf != null) {
@@ -111,6 +120,12 @@ public class FlumeConfigurationBuilder {
return conf;
}
+ private void printProps(Properties props) {
+ for (String key : new TreeSet<String>(props.stringPropertyNames())) {
+ LOGGER.error(key + "=" + props.getProperty(key));
+ }
+ }
+
protected void loadChannels(FlumeConfiguration.AgentConfiguration
agentConf, NodeConfiguration conf) {
LOGGER.info("Creating channels");
Set<String> channels = agentConf.getChannelSet();
Modified:
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
URL:
http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java?rev=1374875&r1=1374874&r2=1374875&view=diff
==============================================================================
---
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
(original)
+++
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
Sun Aug 19 23:07:48 2012
@@ -212,18 +212,18 @@ public class FlumeEmbeddedManager extend
for (int i=0; i < agents.length; ++i) {
sb.append(leading).append("agent").append(i);
leading = " ";
- String prefix = name + "sinks.agent" + i;
+ String prefix = name + ".sinks.agent" + i;
props.put(prefix + ".channel", "file");
props.put(prefix + ".type", "avro");
props.put(prefix + ".hostname", agents[i].getHost());
- props.put(prefix + ".port", agents[i].getPort());
- props.put(prefix + ".batch-size", batchSize);
- props.put(name + ".sinkgroups.group1.sinks", "agent" +i);
+ props.put(prefix + ".port",
Integer.toString(agents[i].getPort()));
+ props.put(prefix + ".batch-size",
Integer.toString(batchSize));
props.put(name +
".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
--priority;
}
props.put(name + ".sinks", sb.toString());
props.put(name + ".sinkgroups", "group1");
+ props.put(name + ".sinkgroups.group1.sinks", sb.toString());
props.put(name + ".sinkgroups.group1.processor.type",
"failover");
String sourceChannels = "file";
props.put(name + ".channels", sourceChannels);
Modified:
logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
URL:
http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml?rev=1374875&r1=1374874&r2=1374875&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
(original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
Sun Aug 19 23:07:48 2012
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="warn" name="MyApp" packages="">
<appenders>
- <Flume name="eventLogger" suppressExceptions="false" compress="true">
+ <Flume name="eventLogger" suppressExceptions="false" compress="true"
embedded="true">
<Agent host="localhost" port="12345"/>
<Agent host="localhost" port="12346"/>
<RFC5424Layout enterpriseNumber="18060" includeMDC="true"
appName="MyApp"/>
Modified: logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml
URL:
http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml?rev=1374875&r1=1374874&r2=1374875&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml (original)
+++ logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml Sun Aug 19
23:07:48 2012
@@ -275,19 +275,32 @@
]]></source>
</p>
</subsection>
- <a name="FlumeAvroAppender"/>
- <subsection name="FlumeAvroAppender">
+ <a name="FlumeAppender"/>
+ <subsection name="FlumeAppender">
<p><i>This is an optional component supplied in a separate
jar.</i></p>
<p><a href="http://incubator.apache.org/projects/flume.html">Apache
Flume</a> is a distributed, reliable,
and available system for efficiently collecting, aggregating, and
moving large amounts of log data
from many different sources to a centralized data store. The
FlumeAppender takes LogEvents and sends
them to a Flume agent as serialized Avro events for
consumption.</p>
- <p>
+ <p><i>Note:</i><br/>
There are two versions of the Flume Appender in the source
control. The first is for "Flume OG", the
original version of Flume before it became an Apache project. The
second is for "Flume NG", which is
maintained by the Apache Flume project. Only the Flume NG appender
is included in Log4j 2 releases as
Flume OG itself is considered deprecated.
</p>
+ <p>
+ The Flume Appender supports two modes of operation.
+ <ol>
+ <li>It can act as a remote Flume client which sends Flume events
via Avro to a Flume Agent configured
+ with an Avro Source.</li>
+ <li>It can act as an embedded Flume Agent where Flume events
pass directly into Flume for processing.</li>
+ </ol>
+ Usage as an embedded agent will cause the messages to be directly
passed to the Flume Channel and then
+ control will be immediately returned to the application. All
interaction with remote agents will occur
+ asynchronously. Setting the "embedded" attribute to "true" will
force the use of the embedded agent. In
+ addition, configuring agent properties in the appender
configuration will also cause the embedded agent
+ to be used.
+ </p>
<table border="1" width="100%">
<tr>
<th>Parameter Name</th>
@@ -299,7 +312,9 @@
<td>Agent[]</td>
<td>An array of Agents to which the logging events should be
sent. If more than one agent is specified
the first Agent will be the primary and subsequent Agents will
be used in the order specified as
- secondaries should the primary Agent fail. Each Agent
definition supplies the Agents host and port.</td>
+ secondaries should the primary Agent fail. Each Agent
definition supplies the Agents host and port.
+ The specification of agents and properties are mutually
exclusive. If both are configured an
+ error will result.</td>
</tr>
<tr>
<td>agentRetries</td>
@@ -318,6 +333,13 @@
<td>When set to true the message body will be compressed using
gzip</td>
</tr>
<tr>
+ <td>embedded</td>
+ <td>boolean</td>
+ <td>When set to true the embedded Flume agent will be used. When
Agent elements are used the events
+ will be sent to a file channel and then routed to a
FailoverSinkProcessor which will use
+ each configured agent in the order they are declared.</td>
+ </tr>
+ <tr>
<td>filter</td>
<td>Filter</td>
<td>A Filter to determine if the event should be handled by this
Appender. More than one Filter
@@ -371,6 +393,14 @@
<td>The name of the Appender.</td>
</tr>
<tr>
+ <td>properties</td>
+ <td>Property[]</td>
+ <td>One or more Property elements that are used to configure the
Flume Agent. The properties must be
+ configured without the agent name (the appender name is used
for this) and no sources can be
+ configured. All other Flume configuration properties are
allowed. Specifying both Agent and Property
+ elements will result in an error.</td>
+ </tr>
+ <tr>
<td>reconnectionDelay</td>
<td>integer</td>
<td>The number of milliseconds the application should wait
before trying again to connect to the
@@ -386,7 +416,7 @@
<caption align="top">FlumeAvroAppender Parameters</caption>
</table>
<p>
- A sample FlumeAvroAppender configuration that is configured with
a primary and a secondary agent,
+ A sample FlumeAppender configuration that is configured with a
primary and a secondary agent,
compresses the body, and formats the body using the
RFC5424Layout:
<source><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
@@ -406,6 +436,78 @@
</configuration>
]]></source>
</p>
+ <p>
+ A sample FlumeAppender configuration that is configured with a
primary and a secondary agent,
+ compresses the body, formats the body using RFC5424Layout and
passes the events to an embedded Flume
+ Agent.
+ </p>
+ <source><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
+<configuration status="warn" name="MyApp" packages="">
+ <appenders>
+ <Flume name="eventLogger" suppressExceptions="false" compress="true"
embedded="true">
+ <Agent host="192.168.10.101" port="8800"/>
+ <Agent host="192.168.10.102" port="8800"/>
+ <RFC5424Layout enterpriseNumber="18060" includeMDC="true"
appName="MyApp"/>
+ </Flume>
+ <Console name="STDOUT">
+ <PatternLayout pattern="%d [%p] %c %m%n"/>
+ </Console>
+ </appenders>
+ <loggers>
+ <logger name="EventLogger" level="info">
+ <appender-ref ref="eventLogger"/>
+ </logger>
+ <root level="warn">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ </loggers>
+</configuration>
+ ]]></source>
+ <p>
+ A sample FlumeAppender configuration that is configured with a
primary and a secondary agent using
+ Flume configuration properties, compresses the body, formats the
body using RFC5424Layout and passes the
+ events to an embedded Flume Agent.
+ </p>
+ <source><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
+<configuration status="error" name="MyApp" packages="">
+ <appenders>
+ <Flume name="eventLogger" suppressExceptions="false" compress="true"
embedded="true">
+ <Property name="channels">file</Property>
+ <Property name="channels.file.type">file</Property>
+ <Property
name="channels.file.checkpointDir">target/file-channel/checkpoint</Property>
+ <Property
name="channels.file.dataDirs">target/file-channel/data</Property>
+ <Property name="sinks">agent1 agent2</Property>
+ <Property name="sinks.agent1.channel">file</Property>
+ <Property name="sinks.agent1.type">avro</Property>
+ <Property name="sinks.agent1.hostname">192.168.10.101</Property>
+ <Property name="sinks.agent1.port">8800</Property>
+ <Property name="sinks.agent1.batch-size">100</Property>
+ <Property name="sinks.agent2.channel">file</Property>
+ <Property name="sinks.agent2.type">avro</Property>
+ <Property name="sinks.agent2.hostname">192.168.10.102</Property>
+ <Property name="sinks.agent2.port">8800</Property>
+ <Property name="sinks.agent2.batch-size">100</Property>
+ <Property name="sinkgroups">group1</Property>
+ <Property name="sinkgroups.group1.sinks">agent1 agent2</Property>
+ <Property name="sinkgroups.group1.processor.type">failover</Property>
+ <Property
name="sinkgroups.group1.processor.priority.agent1">10</Property>
+ <Property name="sinkgroups.group1.processor.priority.agent2">5</Property>
+ <RFC5424Layout enterpriseNumber="18060" includeMDC="true"
appName="MyApp"/>
+ </Flume>
+ <Console name="STDOUT">
+ <PatternLayout pattern="%d [%p] %c %m%n"/>
+ </Console>
+ </appenders>
+ <loggers>
+ <logger name="EventLogger" level="info">
+ <appender-ref ref="eventLogger"/>
+ </logger>
+ <root level="warn">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ </loggers>
+</configuration>
+ ]]></source>
</subsection>
<a name="JMSQueueAppender"/>
<subsection name="JMSQueueAppender">