Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroManager.java URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroManager.java?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroManager.java (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroManager.java Mon Jan 2 18:52:50 2012 @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.flumeog.appender; + +import com.cloudera.flume.handlers.avro.AvroFlumeEvent; +import org.apache.avro.ipc.HttpTransceiver; +import org.apache.avro.ipc.specific.SpecificRequestor; +import org.apache.logging.log4j.core.appender.AbstractManager; +import org.apache.logging.log4j.core.appender.AppenderRuntimeException; +import org.apache.logging.log4j.core.appender.ManagerFactory; + +import com.cloudera.flume.handlers.avro.FlumeEventAvroServer; +import com.cloudera.flume.handlers.avro.AvroEventConvertUtil; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; + +/** + * Manager for FlumeAvroAppenders. + */ +public class FlumeAvroManager extends AbstractManager { + + /** + The default reconnection delay (500 milliseconds or .5 seconds). + */ + public static final int DEFAULT_RECONNECTION_DELAY = 500; + + private static final int DEFAULT_RECONNECTS = 3; + + private static ManagerFactory factory = new AvroManagerFactory(); + + private FlumeEventAvroServer client; + + private final Agent[] agents; + + private int current = 0; + + protected FlumeAvroManager(String name, Agent[] agents) { + super(name); + this.agents = agents; + this.client = connect(agents); + } + + /** + * Return a FlumeAvroManager. + * @param agents The agents to use. + * @return A FlumeAvroManager. + */ + public static FlumeAvroManager getManager(Agent[] agents) { + if (agents == null || agents.length == 0) { + throw new IllegalArgumentException("At least one agent is required"); + } + + StringBuilder sb = new StringBuilder("FlumeAvro["); + boolean first = true; + for (Agent agent : agents) { + if (!first) { + sb.append(","); + } + sb.append(agent.getHost()).append(":").append(agent.getPort()); + first = false; + } + sb.append("]"); + return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(agents)); + } + + /** + * Return the agents. + * @return The agent array. + */ + public Agent[] getAgents() { + return agents; + } + + /** + * Returns the index of the current agent. + * @return The index for the current agent. + */ + public int getCurrent() { + return current; + } + + protected synchronized void send(FlumeEvent event, int delay, int retries) { + if (delay == 0) { + delay = DEFAULT_RECONNECTION_DELAY; + } + if (retries == 0) { + retries = DEFAULT_RECONNECTS; + } + AvroFlumeEvent avroEvent = AvroEventConvertUtil.toAvroEvent(event); + int i = 0; + + String msg = "Error writing to " + getName(); + + do { + try { + client.append(avroEvent); + return; + } catch (Exception ex) { + if (i == retries - 1) { + msg = "Error writing to " + getName() + " at " + agents[0].getHost() + ":" + agents[0].getPort(); + LOGGER.warn(msg, ex); + break; + } + sleep(delay); + } + } while (++i < retries); + + for (int index = 0; index < agents.length; ++index) { + if (index == current) { + continue; + } + Agent agent = agents[index]; + i = 0; + do { + try { + + FlumeEventAvroServer c = connect(agent.getHost(), agent.getPort()); + c.append(avroEvent); + client = c; + current = i; + return; + } catch (Exception ex) { + if (i == retries - 1) { + String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" + + agent.getPort(); + LOGGER.warn(warnMsg, ex); + break; + } + sleep(delay); + } + } while (++i < retries); + } + + throw new AppenderRuntimeException(msg); + + } + + private void sleep(int delay) { + try { + Thread.sleep(delay); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + /** + * There is a very good chance that this will always return the first agent even if it isn't available. + * @param agents The list of agents to choose from + * @return The FlumeEventAvroServer. + */ + private FlumeEventAvroServer connect(Agent[] agents) { + int i = 0; + for (Agent agent : agents) { + FlumeEventAvroServer server = connect(agent.getHost(), agent.getPort()); + if (server != null) { + current = i; + return server; + } + ++i; + } + throw new AppenderRuntimeException("Unable to connect to any agents"); + } + + private FlumeEventAvroServer connect(String hostname, int port) { + URL url; + + try { + url = new URL("http", hostname, port, "/"); + } catch (MalformedURLException ex) { + LOGGER.error("Unable to create a URL for hostname " + hostname + " at port " + port, ex); + return null; + } + + try { + return SpecificRequestor.getClient(FlumeEventAvroServer.class, new HttpTransceiver(url)); + } catch (IOException ioe) { + LOGGER.error("Unable to create Avro client"); + return null; + } + } + + /** + * Factory data. + */ + private static class FactoryData { + private Agent[] agents; + + /** + * Constructor. + * @param agents The agents. + */ + public FactoryData(Agent[] agents) { + this.agents = agents; + } + } + + /** + * Avro Manager Factory. + */ + private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> { + + /** + * Create the FlumeAvroManager. + * @param name The name of the entity to manage. + * @param data The data required to create the entity. + * @return The FlumeAvroManager. + */ + public FlumeAvroManager createManager(String name, FactoryData data) { + try { + + return new FlumeAvroManager(name, data.agents); + } catch (Exception ex) { + LOGGER.error("Could not create FlumeAvroManager", ex); + } + return null; + } + } + +}
Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEvent.java URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEvent.java?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEvent.java (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEvent.java Mon Jan 2 18:52:50 2012 @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.flumeog.appender; + +import com.cloudera.flume.core.EventBaseImpl; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LoggingException; +import org.apache.logging.log4j.Marker; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.helpers.UUIDUtil; +import org.apache.logging.log4j.message.MapMessage; +import org.apache.logging.log4j.message.Message; +import org.apache.logging.log4j.message.StructuredDataId; +import org.apache.logging.log4j.message.StructuredDataMessage; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; +import java.util.zip.GZIPOutputStream; + +/** + * Class that is both a Flume and Log4j Event. + */ +public class FlumeEvent extends EventBaseImpl implements LogEvent { + + private static final String DEFAULT_MDC_PREFIX = "mdc:"; + + private static final String DEFAULT_EVENT_PREFIX = ""; + + private static final String EVENT_TYPE = "eventType"; + + private static final String EVENT_ID = "eventId"; + + private static final String GUID = "guId"; + + private final LogEvent event; + + private byte[] body; + + private final String hostname; + + private final Map<String, String> ctx = new HashMap<String, String>(); + + private final boolean compress; + + /** + * Construct the FlumeEvent. + * @param event The Log4j LogEvent. + * @param hostname The host name. + * @param includes A comma separated list of MDC elements to include. + * @param excludes A comma separated list of MDC elements to exclude. + * @param required A comma separated list of MDC elements that are required to be defined. + * @param mdcPrefix The value to prefix to MDC keys. + * @param eventPrefix The value to prefix to event keys. + * @param compress If true the event body should be compressed. + */ + public FlumeEvent(LogEvent event, String hostname, String includes, String excludes, String required, + String mdcPrefix, String eventPrefix, boolean compress) { + this.event = event; + this.hostname = hostname; + this.compress = compress; + if (mdcPrefix == null) { + mdcPrefix = DEFAULT_MDC_PREFIX; + } + if (eventPrefix == null) { + eventPrefix = DEFAULT_EVENT_PREFIX; + } + this.fields = new HashMap<String, byte[]>(); + Map<String, String> mdc = event.getContextMap(); + if (includes != null) { + String[] array = includes.split(","); + if (array.length > 0) { + for (String str : array) { + if (mdc.containsKey(str)) { + ctx.put(str, mdc.get(str)); + } + } + } + } else if (excludes != null) { + String[] array = excludes.split(","); + if (array.length > 0) { + List<String> list = Arrays.asList(array); + for (Map.Entry<String, String> entry : mdc.entrySet()) { + if (!list.contains(entry.getKey())) { + ctx.put(entry.getKey(), entry.getValue()); + } + } + } + } + + if (required != null) { + String[] array = required.split(","); + if (array.length > 0) { + for (String str : array) { + if (!mdc.containsKey(str)) { + throw new LoggingException("Required key " + str + " is missing from the MDC"); + } + } + } + } + Message message = event.getMessage(); + if (message instanceof MapMessage) { + if (message instanceof StructuredDataMessage) { + addStructuredData(eventPrefix, fields, (StructuredDataMessage) message); + } + addMapData(eventPrefix, fields, (MapMessage) message); + } + + addContextData(mdcPrefix, fields, ctx); + + addGuid(fields); + } + + protected void addStructuredData(String prefix, Map<String, byte[]> fields, StructuredDataMessage msg) { + fields.put(prefix + EVENT_TYPE, msg.getType().getBytes()); + StructuredDataId id = msg.getId(); + fields.put(prefix + EVENT_ID, id.getName().getBytes()); + } + + protected void addMapData(String prefix, Map<String, byte[]> fields, MapMessage msg) { + Map<String, String> data = msg.getData(); + for (Map.Entry<String, String> entry : data.entrySet()) { + fields.put(prefix + entry.getKey(), entry.getValue().getBytes()); + } + } + + protected void addContextData(String prefix, Map<String, byte[]> fields, Map<String, String> context) { + for (Map.Entry<String, String> entry : ctx.entrySet()) { + fields.put(prefix + entry.getKey(), entry.getValue().toString().getBytes()); + } + } + + protected void addGuid(Map<String, byte[]> fields) { + fields.put(GUID, UUIDUtil.getTimeBasedUUID().toString().getBytes()); + } + + /** + * Set the body in the event. + * @param body The body to add to the event. + */ + public void setBody(byte[] body) { + if (body == null || body.length == 0) { + this.body = new byte[0]; + return; + } + if (compress) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + GZIPOutputStream os = new GZIPOutputStream(baos); + os.write(body); + os.close(); + } catch (IOException ioe) { + throw new LoggingException("Unable to compress message", ioe); + } + this.body = baos.toByteArray(); + } else { + this.body = body; + } + } + + @Override + public byte[] getBody() { + return this.body; + } + + @Override + public Priority getPriority() { + switch (event.getLevel()) { + case INFO: + return Priority.INFO; + case ERROR: + return Priority.ERROR; + case DEBUG: + return Priority.DEBUG; + case WARN: + return Priority.WARN; + case TRACE: + return Priority.TRACE; + case FATAL: + return Priority.FATAL; + } + return Priority.INFO; + } + + /** + * Get the Frequently Qualified Class Name. + * @return the FQCN String. + */ + public String getFQCN() { + return event.getFQCN(); + } + + @Override + public long getTimestamp() { + return event.getMillis(); + } + + @Override + public long getNanos() { + return System.nanoTime(); + } + + @Override + public String getHost() { + return hostname; + } + + /** + * Return the logging Level. + * @return the Level. + */ + public Level getLevel() { + return event.getLevel(); + } + + /** + * Return the logger name. + * @return the logger name. + */ + public String getLoggerName() { + return event.getLoggerName(); + } + + /** + * Return the StackTraceElement for the caller of the logging API. + * @return the StackTraceElement of the caller. + */ + public StackTraceElement getSource() { + return event.getSource(); + } + + /** + * Return the Message. + * @return the Message. + */ + public Message getMessage() { + return event.getMessage(); + } + + /** + * Return the Marker. + * @return the Marker. + */ + public Marker getMarker() { + return event.getMarker(); + } + + /** + * Return the name of the Thread. + * @return the name of the Thread. + */ + public String getThreadName() { + return event.getThreadName(); + } + + /** + * Return the event timestamp. + * @return the event timestamp. + */ + public long getMillis() { + return event.getMillis(); + } + + /** + * Return the Throwable associated with the event, if any. + * @return the Throwable. + */ + public Throwable getThrown() { + return event.getThrown(); + } + + /** + * Return a copy of the context Map. + * @return a copy of the context Map. + */ + public Map<String, String> getContextMap() { + return ctx; + } + + /** + * Return a copy of the context stack. + * @return a copy of the context stack. + */ + public Stack<String> getContextStack() { + return event.getContextStack(); + } +} Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEventFactory.java URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEventFactory.java?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEventFactory.java (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEventFactory.java Mon Jan 2 18:52:50 2012 @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.flumeog.appender; + +import org.apache.logging.log4j.core.LogEvent; + +/** + * Factory to create Flume events. + */ +public interface FlumeEventFactory { + /** + * Create a Flume event. + * @param event The Log4j LogEvent. + * @param hostname The name of the host. + * @param includes A comma separated list of MDC elements to include. + * @param excludes A comma separated list of MDC elements to exclude. + * @param required A comma separated list of MDC elements that are required. + * @param mdcPrefix The value to prefix to MDC keys. + * @param eventPrefix The value to prefix to event keys. + * @param compress If true the event body should be compressed. + * @return A FlumeEvent. + */ + FlumeEvent createEvent(LogEvent event, String hostname, String includes, String excludes, String required, + String mdcPrefix, String eventPrefix, boolean compress); +} Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/package-info.java URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/package-info.java?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/package-info.java (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/package-info.java Mon Jan 2 18:52:50 2012 @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +/** + * Apache Flume Appender. Requires the user specifically include Flume and its dependencies. + */ +package org.apache.logging.log4j.flumeog.appender; Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppenderTest.java URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppenderTest.java?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppenderTest.java (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppenderTest.java Mon Jan 2 18:52:50 2012 @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.flumeog.appender; + +import com.cloudera.flume.core.Event; +import com.cloudera.flume.handlers.avro.AvroEventSource; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.Logger; +import org.apache.logging.log4j.core.LoggerContext; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.zip.GZIPInputStream; + +/** + * + */ +public class FlumeAvroAppenderTest { + + private LoggerContext ctx = (LoggerContext) LogManager.getContext(); + + private static final String LOGBACK_CONF = "logback.configurationFile"; + private static final String LOGBACK_CONFIG = "logback-flume.xml"; + + private static final int testServerPort = 12345; + private static final int testEventCount = 100; + + private AvroEventSource eventSource; + private Logger avroLogger; + + @BeforeClass + public static void setupClass() { + System.setProperty(LOGBACK_CONF, LOGBACK_CONFIG); + } + + @AfterClass + public static void cleanupClass() { + System.clearProperty(LOGBACK_CONF); + } + + @Before + public void setUp() throws IOException { + eventSource = new AvroEventSource(testServerPort); + avroLogger = (Logger) LogManager.getLogger("avrologger"); + /* + * Clear out all other appenders associated with this logger to ensure we're + * only hitting the Avro appender. + */ + removeAppenders(avroLogger); + eventSource.open(); + } + + @After + public void teardown() throws IOException { + removeAppenders(avroLogger); + eventSource.close(); + } + + @Test + public void testLog4jAvroAppender() throws InterruptedException, IOException { + Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(testServerPort))}; + FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null, + null, null, null, null, "true", null, null, null); + avroAppender.start(); + avroLogger.addAppender(avroAppender); + avroLogger.setLevel(Level.ALL); + + Assert.assertNotNull(avroLogger); + + int loggedCount = 0; + int receivedCount = 0; + + for (int i = 0; i < testEventCount; i++) { + avroLogger.info("test i:" + i); + loggedCount++; + } + + /* + * We perform this in another thread so we can put a time SLA on it by using + * Future#get(). Internally, the AvroEventSource uses a BlockingQueue. + */ + ExecutorService executor = Executors.newSingleThreadExecutor(); + Callable<Event> callable = new Callable<Event>() { + + public Event call() throws Exception { + return eventSource.next(); + } + }; + + for (int i = 0; i < loggedCount; i++) { + try { + Future<Event> future = executor.submit(callable); + + /* + * We must receive events in less than 1 second. This should be more + * than enough as all events should be held in AvroEventSource's + * BlockingQueue. + */ + Event event = future.get(1, TimeUnit.SECONDS); + + Assert.assertNotNull(event); + Assert.assertNotNull(event.getBody()); + String body = getBody(event); + Assert.assertTrue(body.endsWith("test i:" + i)); + + receivedCount++; + } catch (ExecutionException e) { + Assert.fail("Flume failed to handle an event: " + e.getMessage()); + break; + } catch (TimeoutException e) { + Assert + .fail("Flume failed to handle an event within the given time SLA: " + + e.getMessage()); + break; + } catch (InterruptedException e) { + Assert + .fail("Flume source executor thread was interrupted. We count this as a failure."); + Thread.currentThread().interrupt(); + break; + } + } + + executor.shutdown(); + + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + throw new IllegalStateException( + "Executor is refusing to shutdown cleanly"); + } + + Assert.assertEquals(loggedCount, receivedCount); + } + + @Test + public void testConnectionRefused() { + Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(44000))}; + FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null, + null, null, null, null, "true", null, null, null); + avroAppender.start(); + avroLogger.addAppender(avroAppender); + avroLogger.setLevel(Level.ALL); + + boolean caughtException = false; + + try { + avroLogger.info("message 1"); + } catch (Throwable t) { + //logger.debug("Logging to a non-existant server failed (as expected)", t); + + caughtException = true; + } + + Assert.assertTrue(caughtException); + } + + @Test + public void testReconnect() throws IOException { + Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(testServerPort))}; + FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "500", "10", "avro", "false", null, + null, null, null, null, "true", null, null, null); + avroAppender.start(); + avroLogger.addAppender(avroAppender); + avroLogger.setLevel(Level.ALL); + avroLogger.info("message 1"); + + Event event = eventSource.next(); + + Assert.assertNotNull(event); + String body = getBody(event); + Assert.assertTrue(body.endsWith("message 1")); + + eventSource.close(); + + Callable<Void> logCallable = new Callable<Void>() { + + public Void call() throws Exception { + avroLogger.info("message 2"); + return null; + } + }; + + ExecutorService logExecutor = Executors.newSingleThreadExecutor(); + + boolean caughtException = false; + + try { + logExecutor.submit(logCallable); + + Thread.sleep(1500); + + eventSource.open(); + + logExecutor.shutdown(); + + if (!logExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + throw new IllegalStateException( + "Log executor is refusing to shutdown cleanly"); + } + } catch (Throwable t) { + System.err.println("Failed to reestablish a connection and log to an avroSource"); + + caughtException = true; + } + + Assert.assertFalse(caughtException); + + event = eventSource.next(); + + Assert.assertNotNull(event); + body = getBody(event); + Assert.assertTrue(body.endsWith("message 2")); + + caughtException = false; + + try { + avroLogger.info("message 3"); + } catch (Throwable t) { + System.err.println("Logging to a closed server failed (not expected)"); + + caughtException = true; + } + + Assert.assertFalse(caughtException); + + event = eventSource.next(); + + Assert.assertNotNull(event); + body = getBody(event); + Assert.assertTrue(body.endsWith("message 3")); + } + + + private void removeAppenders(Logger logger) { + Map<String,Appender> map = logger.getAppenders(); + for (Map.Entry<String, Appender> entry : map.entrySet()) { + Appender app = entry.getValue(); + avroLogger.removeAppender(app); + app.stop(); + } + } + + private Appender getAppender(Logger logger, String name) { + Map<String,Appender> map = logger.getAppenders(); + return map.get(name); + } + + private String getBody(Event event) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody())); + int n = 0; + while (-1 != (n = is.read())) { + baos.write(n); + } + return new String(baos.toByteArray()); + + } +} Modified: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/pom.xml URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/pom.xml?rev=1226514&r1=1226513&r2=1226514&view=diff ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/pom.xml (original) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/pom.xml Mon Jan 2 18:52:50 2012 @@ -232,59 +232,53 @@ </plugin> --> </plugins> </build> - <reporting> - <plugins> - <!-- Changes report --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-changes-plugin</artifactId> - <version>2.6</version> - <reportSets> - <reportSet> - <reports> - <report>changes-report</report> - <report>jira-report</report> - </reports> - </reportSet> - </reportSets> - <configuration> - <statusIds>Resolved, Closed</statusIds> - <columnNames>Type,Key,Summary,Assignee,Status,Resolution,Fix Version</columnNames> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-project-info-reports-plugin</artifactId> - <version>2.4</version> - </plugin> - <!-- Surefire report --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-report-plugin</artifactId> - <version>2.11</version> - </plugin> + <reporting> + <plugins> + <!-- Changes report --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-changes-plugin</artifactId> + <version>2.6</version> + <reportSets> + <reportSet> + <reports> + <report>changes-report</report> + <report>jira-report</report> + </reports> + </reportSet> + </reportSets> + <configuration> + <statusIds>Resolved, Closed</statusIds> + <columnNames>Type,Key,Summary,Assignee,Status,Resolution,Fix Version</columnNames> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-project-info-reports-plugin</artifactId> + <version>2.4</version> + </plugin> + <!-- Surefire report --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + <version>2.11</version> + </plugin> - <!-- RAT report --> - <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <version>0.8</version> - <configuration> - <excludes> - <exclude>.idea/**/*</exclude> - <exclude>src/test/resources/**/*</exclude> - </excludes> - </configuration> - </plugin> - </plugins> - </reporting> + <!-- RAT report --> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <version>0.8</version> + <configuration> + <excludes> + <exclude>.idea/**/*</exclude> + <exclude>src/test/resources/**/*</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </reporting> - <repositories> - <repository> - <id>cloudera</id> - <url>https://repository.cloudera.com/content/repositories/releases/</url> - </repository> - </repositories> <distributionManagement> <repository> <id>apache.releases.https</id> @@ -308,4 +302,25 @@ <module>slf4j-impl</module> <module>log4j2-jcl</module> </modules> + <profiles> + <profile> + <id>include-flume</id> + <modules> + <module>log4j2-flume-og</module> + <module>log4j2-flume-ng</module> + </modules> + </profile> + <profile> + <id>include-flume-ng</id> + <modules> + <module>log4j2-flume-ng</module> + </modules> + </profile> + <profile> + <id>include-flume-og</id> + <modules> + <module>log4j2-flume-og</module> + </modules> + </profile> + </profiles> </project>
