Author: rgoers
Date: Sat Mar 30 23:45:45 2013
New Revision: 1462863
URL: http://svn.apache.org/r1462863
Log:
Add batch support to FlumePersistentManager
Added:
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java
Modified:
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml
Added:
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java
URL:
http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java?rev=1462863&view=auto
==============================================================================
---
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java
(added)
+++
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java
Sat Mar 30 23:45:45 2013
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import org.apache.flume.event.SimpleEvent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public class BatchEvent {
+
+ private List<SimpleEvent> events = new ArrayList<SimpleEvent>();
+
+ public void addEvent(SimpleEvent event) {
+ events.add(event);
+ }
+
+ public List<SimpleEvent> getEvents() {
+ return events;
+ }
+}
Modified:
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
URL:
http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java?rev=1462863&r1=1462862&r2=1462863&view=diff
==============================================================================
---
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
(original)
+++
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
Sat Mar 30 23:45:45 2013
@@ -119,6 +119,67 @@ public class FlumeAvroManager extends Ab
return current;
}
+ public synchronized void send(final BatchEvent events) {
+ if (client == null) {
+ client = connect(agents);
+ }
+
+ if (client != null) {
+ final List<SimpleEvent> list = events.getEvents();
+ final List<AvroFlumeEvent> batch = new
ArrayList<AvroFlumeEvent>(list.size());
+ for (SimpleEvent event : list) {
+ final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+ avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
+ avroEvent.setHeaders(new HashMap<CharSequence,
CharSequence>());
+
+ for (final Map.Entry<String, String> entry :
event.getHeaders().entrySet()) {
+ avroEvent.getHeaders().put(entry.getKey(),
entry.getValue());
+ }
+ batch.add(avroEvent);
+ }
+
+ try {
+ final Status status = client.appendBatch(batch);
+ if (status.equals(Status.OK)) {
+ return;
+ } else {
+ LOGGER.warn("RPC communication failed to " +
agents[current].getHost() +
+ ":" + agents[current].getPort());
+ }
+ } catch (final Exception ex) {
+ String msg = "Unable to write to " + getName() + " at " +
agents[current].getHost() + ":" +
+ agents[current].getPort();
+ LOGGER.warn(msg, ex);
+ }
+
+ for (int index = 0; index < agents.length; ++index) {
+ if (index == current) {
+ continue;
+ }
+ final Agent agent = agents[index];
+ try {
+ transceiver = null;
+ final AvroSourceProtocol c = connect(agent.getHost(),
agent.getPort());
+ final Status status = c.appendBatch(batch);
+ if (!status.equals(Status.OK)) {
+ final String warnMsg = "RPC communication failed to "
+ getName() + " at " +
+ agent.getHost() + ":" + agent.getPort();
+ LOGGER.warn(warnMsg);
+ continue;
+ }
+ client = c;
+ current = index;
+ return;
+ } catch (final Exception ex) {
+ final String warnMsg = "Unable to write to " + getName() +
" at " + agent.getHost() + ":" +
+ agent.getPort();
+ LOGGER.warn(warnMsg, ex);
+ }
+ }
+ }
+ throw new AppenderRuntimeException("No Flume agents are available");
+ }
+
@Override
public synchronized void send(final SimpleEvent event, int delay, int
retries) {
if (delay == 0) {
@@ -188,7 +249,7 @@ public class FlumeAvroManager extends Ab
continue;
}
client = c;
- current = i;
+ current = index;
return;
} catch (final Exception ex) {
if (i == retries - 1) {
Modified:
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
URL:
http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java?rev=1462863&r1=1462862&r2=1462863&view=diff
==============================================================================
---
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
(original)
+++
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
Sat Mar 30 23:45:45 2013
@@ -45,6 +45,7 @@ import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
/**
*
@@ -88,7 +89,7 @@ public class FlumePersistentManager exte
SecretKey secretKey) {
super(name, shortName, agents, batchSize);
this.database = database;
- this.worker = new WriterThread(database, this, queue, secretKey);
+ this.worker = new WriterThread(database, this, queue, batchSize,
secretKey);
this.worker.start();
this.reconnectionDelay = reconnectionDelay <= 0 ? DEFAULT_DELAY :
reconnectionDelay;
this.secretKey = secretKey;
@@ -113,7 +114,7 @@ public class FlumePersistentManager exte
}
String dataDirectory = dataDir == null || dataDir.length() == 0 ?
DEFAULT_DATA_DIR : dataDir;
- final StringBuilder sb = new StringBuilder("FlumeKrati[");
+ final StringBuilder sb = new StringBuilder("FlumePersistent[");
boolean first = true;
for (final Agent agent : agents) {
if (!first) {
@@ -300,12 +301,14 @@ public class FlumePersistentManager exte
private final FlumePersistentManager manager;
private final LinkedBlockingQueue<byte[]> queue;
private final SecretKey secretKey;
+ private final int batchSize;
public WriterThread(Database database, FlumePersistentManager manager,
LinkedBlockingQueue<byte[]> queue,
- SecretKey secretKey) {
+ int batchsize, SecretKey secretKey) {
this.database = database;
this.manager = manager;
this.queue = queue;
+ this.batchSize = batchsize;
this.secretKey = secretKey;
}
@@ -323,93 +326,130 @@ public class FlumePersistentManager exte
@Override
public void run() {
LOGGER.trace("WriterThread started");
+ long lastBatch = System.currentTimeMillis();
while (!shutdown) {
- try {
- boolean errors = false;
- final DatabaseEntry key = new DatabaseEntry();
- final DatabaseEntry data = new DatabaseEntry();
- final Cursor cursor = database.openCursor(null, null);
+ if (database.count() >= batchSize ||
+ database.count() > 0 && lastBatch +
manager.reconnectionDelay > System.currentTimeMillis()) {
try {
- queue.clear();
- OperationStatus status;
+ boolean errors = false;
+ DatabaseEntry key = new DatabaseEntry();
+ final DatabaseEntry data = new DatabaseEntry();
+ final Cursor cursor = database.openCursor(null, null);
try {
- status = cursor.getFirst(key, data, LockMode.RMW);
-
- while (status == OperationStatus.SUCCESS) {
- SimpleEvent event = new SimpleEvent();
- try {
- byte[] eventData = data.getData();
- if (secretKey != null) {
- Cipher cipher =
Cipher.getInstance("AES");
- cipher.init(Cipher.DECRYPT_MODE,
secretKey);
- eventData = cipher.doFinal(eventData);
- }
- ByteArrayInputStream bais = new
ByteArrayInputStream(eventData);
- DataInputStream dais = new
DataInputStream(bais);
- int length = dais.readInt();
- byte[] bytes = new byte[length];
- dais.read(bytes, 0, length);
- event.setBody(bytes);
- length = dais.readInt();
- Map<String, String> map = new
HashMap<String, String>(length);
- for (int i = 0; i < length; ++i) {
- String headerKey = dais.readUTF();
- String value = dais.readUTF();
- map.put(headerKey, value);
+ queue.clear();
+ OperationStatus status;
+ try {
+ status = cursor.getFirst(key, data,
LockMode.RMW);
+ if (batchSize > 1) {
+ BatchEvent batch = new BatchEvent();
+ while (status == OperationStatus.SUCCESS) {
+ SimpleEvent event = createEvent(data);
+ if (event != null) {
+ batch.addEvent(event);
+ }
+ status = cursor.getNext(key, data,
LockMode.RMW);
}
- event.setHeaders(map);
- } catch (Exception ex) {
- errors = true;
- LOGGER.error("Error retrieving event", ex);
- continue;
- }
- try {
- manager.doSend(event);
- } catch (Exception ioe) {
- errors = true;
- LOGGER.error("Error sending event", ioe);
- break;
- }
- if (!errors) {
try {
- cursor.delete();
- } catch (Exception ex) {
- LOGGER.error("Unable to delete event",
ex);
+ manager.send(batch);
+ lastBatch = System.currentTimeMillis();
+ } catch (Exception ioe) {
+ LOGGER.error("Error sending events",
ioe);
+ break;
+ }
+ for (SimpleEvent event :
batch.getEvents()) {
+ try {
+ Map<String, String> headers =
event.getHeaders();
+ key = new
DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
+ database.delete(null, key);
+ } catch (Exception ex) {
+ LOGGER.error("Error deleting key
from database", ex);
+ }
+ }
+ } else {
+ while (status == OperationStatus.SUCCESS) {
+ SimpleEvent event = createEvent(data);
+ if (event != null) {
+ try {
+ manager.doSend(event);
+ } catch (Exception ioe) {
+ errors = true;
+ LOGGER.error("Error sending
event", ioe);
+ break;
+ }
+ if (!errors) {
+ try {
+ cursor.delete();
+ } catch (Exception ex) {
+ LOGGER.error("Unable to
delete event", ex);
+ }
+ }
+ }
+ status = cursor.getNext(key, data,
LockMode.RMW);
}
}
- status = cursor.getNext(key, data,
LockMode.RMW);
+ } catch (Exception ex) {
+ LOGGER.error("Error reading database", ex);
+ shutdown = true;
+ break;
}
- } catch (Exception ex) {
- LOGGER.error("Error reading database", ex);
- shutdown = true;
- break;
- }
- } finally {
- cursor.close();
- }
- if (errors) {
- Thread.sleep(manager.reconnectionDelay);
- continue;
+ } finally {
+ cursor.close();
+ }
+ if (errors) {
+ Thread.sleep(manager.reconnectionDelay);
+ continue;
+ }
+ } catch (Exception ex) {
+ LOGGER.warn("WriterThread encountered an exception.
Continuing.", ex);
}
- } catch (Exception ex) {
- LOGGER.warn("WriterThread encountered an exception.
Continuing.", ex);
- }
- try {
- if (database.count() > 0) {
- continue;
+ } else {
+ try {
+ if (database.count() >= batchSize) {
+ continue;
+ }
+ queue.poll(manager.reconnectionDelay,
TimeUnit.MILLISECONDS);
+ LOGGER.debug("WriterThread notified of work");
+ } catch (InterruptedException ie) {
+ LOGGER.warn("WriterThread interrupted, continuing");
+ } catch (Exception ex) {
+ LOGGER.error("WriterThread encountered an exception
waiting for work", ex);
+ break;
}
- queue.take();
- LOGGER.debug("WriterThread notified of work");
- } catch (InterruptedException ie) {
- LOGGER.warn("WriterThread interrupted, continuing");
- } catch (Exception ex) {
- LOGGER.error("WriterThread encountered an exception
waiting for work", ex);
- break;
}
}
LOGGER.trace("WriterThread exiting");
}
+ private SimpleEvent createEvent(DatabaseEntry data) {
+ SimpleEvent event = new SimpleEvent();
+ try {
+ byte[] eventData = data.getData();
+ if (secretKey != null) {
+ Cipher cipher = Cipher.getInstance("AES");
+ cipher.init(Cipher.DECRYPT_MODE, secretKey);
+ eventData = cipher.doFinal(eventData);
+ }
+ ByteArrayInputStream bais = new
ByteArrayInputStream(eventData);
+ DataInputStream dais = new DataInputStream(bais);
+ int length = dais.readInt();
+ byte[] bytes = new byte[length];
+ dais.read(bytes, 0, length);
+ event.setBody(bytes);
+ length = dais.readInt();
+ Map<String, String> map = new HashMap<String, String>(length);
+ for (int i = 0; i < length; ++i) {
+ String headerKey = dais.readUTF();
+ String value = dais.readUTF();
+ map.put(headerKey, value);
+ }
+ event.setHeaders(map);
+ return event;
+ } catch (Exception ex) {
+ LOGGER.error("Error retrieving event", ex);
+ return null;
+ }
+ }
+
}
}
Modified:
logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
URL:
http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java?rev=1462863&r1=1462862&r2=1462863&view=diff
==============================================================================
---
logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
(original)
+++
logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
Sat Mar 30 23:45:45 2013
@@ -215,7 +215,7 @@ public class FlumePersistentAppenderTest
Assert.assertTrue("Channel contained event, but not expected
message " + i, fields[i]);
}
}
-
+ /*
@Test
public void testPerformance() throws Exception {
long start = System.currentTimeMillis();
@@ -227,7 +227,7 @@ public class FlumePersistentAppenderTest
}
long elapsed = System.currentTimeMillis() - start;
System.out.println("Time to log " + count + " events " + elapsed +
"ms");
- }
+ } */
private String getBody(final Event event) throws IOException {
@@ -292,12 +292,15 @@ public class FlumePersistentAppenderTest
public Status append(AvroFlumeEvent event) throws AvroRemoteException {
eventQueue.add(event);
+ //System.out.println("Received event " +
event.getHeaders().get(new org.apache.avro.util.Utf8(FlumeEvent.GUID)));
return Status.OK;
}
- public Status appendBatch(List<AvroFlumeEvent> events)
- throws AvroRemoteException {
+ public Status appendBatch(List<AvroFlumeEvent> events) throws
AvroRemoteException {
Preconditions.checkState(eventQueue.addAll(events));
+ for (AvroFlumeEvent event : events) {
+ // System.out.println("Received event " +
event.getHeaders().get(new org.apache.avro.util.Utf8(FlumeEvent.GUID)));
+ }
return Status.OK;
}
}
Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml
URL:
http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml?rev=1462863&r1=1462862&r2=1462863&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml
(original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml Sat
Mar 30 23:45:45 2013
@@ -1,7 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="info" name="MyApp"
packages="org.apache.logging.log4j.flume.test">
<appenders>
- <Flume name="eventLogger" suppressExceptions="false" compress="true"
type="persistent" dataDir="target/persistent">
+ <Flume name="eventLogger" suppressExceptions="false" compress="true"
type="persistent" dataDir="target/persistent"
+ batchsize="100">
<Agent host="localhost" port="${sys:primaryPort}"/>
<Agent host="localhost" port="${sys:alternatePort}"/>
<RFC5424Layout enterpriseNumber="18060" includeMDC="true"
appName="MyApp"/>