Author: rgoers
Date: Fri Oct 28 22:38:43 2011
New Revision: 1190645
URL: http://svn.apache.org/viewvc?rev=1190645&view=rev
Log:
Use a factory to create FlumeEvent
Added:
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEventFactory.java
Modified:
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppender.java
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEvent.java
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppenderTest.java
Modified:
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppender.java
URL:
http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppender.java?rev=1190645&r1=1190644&r2=1190645&view=diff
==============================================================================
---
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppender.java
(original)
+++
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppender.java
Fri Oct 28 22:38:43 2011
@@ -32,7 +32,7 @@ import java.net.InetAddress;
*
*/
@Plugin(name="Flume",type="Core",elementType="appender",printObject=true)
-public class FlumeAvroAppender extends AppenderBase {
+public class FlumeAvroAppender extends AppenderBase implements
FlumeEventFactory {
private FlumeAvroManager manager;
@@ -52,9 +52,12 @@ public class FlumeAvroAppender extends A
private final int retries;
+ private final FlumeEventFactory factory;
+
private FlumeAvroAppender(String name, Filter filter, Layout layout,
boolean handleException,
String hostname, String includes, String
excludes, String required, String mdcPrefix,
- String eventPrefix, boolean compress, int delay,
int retries, FlumeAvroManager manager) {
+ String eventPrefix, boolean compress, int delay,
int retries,
+ FlumeEventFactory factory, FlumeAvroManager
manager) {
super(name, filter, layout, handleException);
this.manager = manager;
this.mdcIncludes = includes;
@@ -66,11 +69,12 @@ public class FlumeAvroAppender extends A
this.hostname = hostname;
this.reconnectDelay = delay;
this.retries = retries;
+ this.factory = factory == null ? this : factory;
}
public void append(LogEvent event) {
- FlumeEvent flumeEvent = new FlumeEvent(event, hostname, mdcIncludes,
mdcExcludes, mdcRequired, mdcPrefix,
+ FlumeEvent flumeEvent = factory.createEvent(event, hostname,
mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
eventPrefix, compressBody);
flumeEvent.setBody(getLayout().format(flumeEvent));
manager.send(flumeEvent, reconnectDelay, retries);
@@ -82,6 +86,12 @@ public class FlumeAvroAppender extends A
manager.release();
}
+ public FlumeEvent createEvent(LogEvent event, String hostname, String
includes, String excludes, String required,
+ String mdcPrefix, String eventPrefix, boolean compress) {
+ return new FlumeEvent(event, hostname, mdcIncludes, mdcExcludes,
mdcRequired, mdcPrefix,
+ eventPrefix, compressBody);
+ }
+
@PluginFactory
public static FlumeAvroAppender createAppender(@PluginElement("agents")
Agent[] agents,
@PluginAttr("reconnectionDelay") String delay,
@@ -94,6 +104,7 @@ public class FlumeAvroAppender extends A
@PluginAttr("mdcPrefix")
String mdcPrefix,
@PluginAttr("eventPrefix")
String eventPrefix,
@PluginAttr("compress")
String compressBody,
+
@PluginAttr("flumeEventFactory") FlumeEventFactory factory,
@PluginElement("layout")
Layout layout,
@PluginElement("filters")
Filter filter) {
@@ -129,7 +140,8 @@ public class FlumeAvroAppender extends A
if (manager == null) {
return null;
}
+
return new FlumeAvroAppender(name, filter, layout, handleExceptions,
hostname, includes,
- excludes, required, mdcPrefix, eventPrefix, compress,
reconnectDelay, retries, manager);
+ excludes, required, mdcPrefix, eventPrefix, compress,
reconnectDelay, retries, factory, manager);
}
}
Modified:
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEvent.java
URL:
http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEvent.java?rev=1190645&r1=1190644&r2=1190645&view=diff
==============================================================================
---
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEvent.java
(original)
+++
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEvent.java
Fri Oct 28 22:38:43 2011
@@ -21,6 +21,7 @@ import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LoggingException;
import org.apache.logging.log4j.Marker;
import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.message.MapMessage;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.StructuredDataId;
import org.apache.logging.log4j.message.StructuredDataMessage;
@@ -51,9 +52,9 @@ class FlumeEvent extends EventBaseImpl i
private static final String DEFAULT_EVENT_PREFIX = "";
- private static final String EVENT_TYPE = "EventType";
+ private static final String EVENT_TYPE = "eventType";
- private static final String EVENT_ID = "EventId";
+ private static final String EVENT_ID = "eventId";
private static final String GUID = "guId";
@@ -103,21 +104,39 @@ class FlumeEvent extends EventBaseImpl i
}
}
}
- if (event.getMessage() instanceof StructuredDataMessage) {
- StructuredDataMessage msg = (StructuredDataMessage)
event.getMessage();
- fields.put(eventPrefix + EVENT_TYPE, msg.getType().getBytes());
- StructuredDataId id = msg.getId();
- fields.put(eventPrefix + EVENT_ID, id.getName().getBytes());
- Map<String, String> data = msg.getData();
- for (Map.Entry<String, String> entry : data.entrySet()) {
- fields.put(eventPrefix + entry.getKey(),
entry.getValue().getBytes());
+ Message message = event.getMessage();
+ if (message instanceof MapMessage) {
+ if (message instanceof StructuredDataMessage) {
+ addStructuredData(eventPrefix, fields, (StructuredDataMessage)
message);
}
+ addMapData(eventPrefix, fields, (MapMessage) message);
}
+ addContextData(mdcPrefix, fields, ctx);
+
+ addGuid(fields);
+ }
+
+ protected void addStructuredData(String prefix, Map<String, byte[]>
fields, StructuredDataMessage msg) {
+ fields.put(prefix + EVENT_TYPE, msg.getType().getBytes());
+ StructuredDataId id = msg.getId();
+ fields.put(prefix + EVENT_ID, id.getName().getBytes());
+ }
+
+ protected void addMapData(String prefix, Map<String, byte[]> fields,
MapMessage msg) {
+ Map<String, String> data = msg.getData();
+ for (Map.Entry<String, String> entry : data.entrySet()) {
+ fields.put(prefix + entry.getKey(), entry.getValue().getBytes());
+ }
+ }
+
+ protected void addContextData(String prefix, Map<String, byte[]> fields,
Map<String, String> context) {
for (Map.Entry<String, String> entry : ctx.entrySet()) {
- fields.put(mdcPrefix + entry.getKey(),
entry.getValue().toString().getBytes());
+ fields.put(prefix + entry.getKey(),
entry.getValue().toString().getBytes());
}
+ }
+ protected void addGuid(Map<String, byte[]> fields) {
fields.put(GUID, UUIDUtil.getTimeBasedUUID().toString().getBytes());
}
Added:
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEventFactory.java
URL:
http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEventFactory.java?rev=1190645&view=auto
==============================================================================
---
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEventFactory.java
(added)
+++
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEventFactory.java
Fri Oct 28 22:38:43 2011
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.flume;
+
+import org.apache.logging.log4j.core.LogEvent;
+
+/**
+ *
+ */
+public interface FlumeEventFactory {
+ FlumeEvent createEvent(LogEvent event, String hostname, String includes,
String excludes, String required,
+ String mdcPrefix, String eventPrefix, boolean compress);
+}
Modified:
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppenderTest.java
URL:
http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppenderTest.java?rev=1190645&r1=1190644&r2=1190645&view=diff
==============================================================================
---
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppenderTest.java
(original)
+++
logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppenderTest.java
Fri Oct 28 22:38:43 2011
@@ -93,7 +93,7 @@ public class FlumeAvroAppenderTest {
public void testLog4jAvroAppender() throws InterruptedException,
IOException {
Agent[] agents = new Agent[] {Agent.createAgent("localhost",
Integer.toString(testServerPort))};
FlumeAvroAppender avroAppender =
FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
- null, null, null, null, "true", null, null);
+ null, null, null, null, "true", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
@@ -167,7 +167,7 @@ public class FlumeAvroAppenderTest {
public void testConnectionRefused() {
Agent[] agents = new Agent[] {Agent.createAgent("localhost",
Integer.toString(44000))};
FlumeAvroAppender avroAppender =
FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
- null, null, null, null, "true", null, null);
+ null, null, null, null, "true", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
@@ -189,7 +189,7 @@ public class FlumeAvroAppenderTest {
public void testReconnect() throws IOException {
Agent[] agents = new Agent[] {Agent.createAgent("localhost",
Integer.toString(testServerPort))};
FlumeAvroAppender avroAppender =
FlumeAvroAppender.createAppender(agents, "500", "10", "avro", "false", null,
- null, null, null, null, "true", null, null);
+ null, null, null, null, "true", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);