Internal error messages start with a capital letter. Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/9e3ba659 Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/9e3ba659 Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/9e3ba659
Branch: refs/heads/LOG4J2-1010&LOG4J2-1447-injectable-contextdata&better-datastructure Commit: 9e3ba65942a756ea50474242e17310553aa002bd Parents: 9d757a1 Author: Gary Gregory <ggreg...@apache.org> Authored: Wed Aug 3 12:04:46 2016 -0700 Committer: Gary Gregory <ggreg...@apache.org> Committed: Wed Aug 3 12:04:46 2016 -0700 ---------------------------------------------------------------------- .../appender/db/AbstractDatabaseManager.java | 2 +- .../appender/db/jdbc/JdbcDatabaseManager.java | 4 +- .../appender/db/jpa/JpaDatabaseManager.java | 2 +- .../flume/appender/FlumePersistentManager.java | 1758 +++++++++--------- 4 files changed, 883 insertions(+), 883 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/9e3ba659/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java index 8d58ab6..82c7b18 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java @@ -88,7 +88,7 @@ public abstract class AbstractDatabaseManager extends AbstractManager implements try { this.shutdownInternal(); } catch (final Exception e) { - logWarn("caught exception while performing database shutdown operations", e); + logWarn("Caught exception while performing database shutdown operations", e); } finally { this.running = false; } http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/9e3ba659/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/jdbc/JdbcDatabaseManager.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/jdbc/JdbcDatabaseManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/jdbc/JdbcDatabaseManager.java index a1d6204..76e1901 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/jdbc/JdbcDatabaseManager.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/jdbc/JdbcDatabaseManager.java @@ -144,7 +144,7 @@ public final class JdbcDatabaseManager extends AbstractDatabaseManager { try { Closer.close(this.statement); } catch (final Exception e) { - logWarn("failed to close SQL statement logging event or flushing buffer", e); + logWarn("Failed to close SQL statement logging event or flushing buffer", e); } finally { this.statement = null; } @@ -152,7 +152,7 @@ public final class JdbcDatabaseManager extends AbstractDatabaseManager { try { Closer.close(this.connection); } catch (final Exception e) { - logWarn("failed to close database connection logging event or flushing buffer", e); + logWarn("Failed to close database connection logging event or flushing buffer", e); } finally { this.connection = null; } http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/9e3ba659/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/jpa/JpaDatabaseManager.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/jpa/JpaDatabaseManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/jpa/JpaDatabaseManager.java index 480c8b7..17c5642 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/jpa/JpaDatabaseManager.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/jpa/JpaDatabaseManager.java @@ -125,7 +125,7 @@ public final class JpaDatabaseManager extends AbstractDatabaseManager { this.entityManager.close(); } } catch (final Exception e) { - logWarn("failed to close entity manager while logging event or flushing buffer", e); + logWarn("Failed to close entity manager while logging event or flushing buffer", e); } finally { this.entityManager = null; } http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/9e3ba659/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java ---------------------------------------------------------------------- diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java index 4134671..8a83ca7 100644 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java +++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java @@ -1,879 +1,879 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache license, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the license for the specific language governing permissions and - * limitations under the license. - */ -package org.apache.logging.log4j.flume.appender; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import javax.crypto.Cipher; -import javax.crypto.SecretKey; - -import org.apache.flume.Event; -import org.apache.flume.event.SimpleEvent; -import org.apache.logging.log4j.LoggingException; -import org.apache.logging.log4j.core.appender.ManagerFactory; -import org.apache.logging.log4j.core.config.Property; -import org.apache.logging.log4j.core.config.plugins.util.PluginManager; -import org.apache.logging.log4j.core.config.plugins.util.PluginType; -import org.apache.logging.log4j.core.util.FileUtils; -import org.apache.logging.log4j.core.util.Log4jThread; -import org.apache.logging.log4j.core.util.SecretKeyProvider; -import org.apache.logging.log4j.util.Strings; - -import com.sleepycat.je.Cursor; -import com.sleepycat.je.CursorConfig; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.LockConflictException; -import com.sleepycat.je.LockMode; -import com.sleepycat.je.OperationStatus; -import com.sleepycat.je.StatsConfig; -import com.sleepycat.je.Transaction; - -/** - * Manager that persists data to Berkeley DB before passing it on to Flume. - */ -public class FlumePersistentManager extends FlumeAvroManager { - - /** Attribute name for the key provider. */ - public static final String KEY_PROVIDER = "keyProvider"; - - private static final Charset UTF8 = StandardCharsets.UTF_8; - - private static final String DEFAULT_DATA_DIR = ".log4j/flumeData"; - - private static final int SHUTDOWN_WAIT = 60; - - private static final int MILLIS_PER_SECOND = 1000; - - private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500; - - private static BDBManagerFactory factory = new BDBManagerFactory(); - - private final Database database; - - private final Environment environment; - - private final WriterThread worker; - - private final Gate gate = new Gate(); - - private final SecretKey secretKey; - - private final int lockTimeoutRetryCount; - - private final ExecutorService threadPool; - - private final AtomicLong dbCount = new AtomicLong(); - - /** - * Constructor - * @param name The unique name of this manager. - * @param shortName Original name for the Manager. - * @param agents An array of Agents. - * @param batchSize The number of events to include in a batch. - * @param retries The number of times to retry connecting before giving up. - * @param connectionTimeout The amount of time to wait for a connection to be established. - * @param requestTimeout The amount of time to wair for a response to a request. - * @param delay The amount of time to wait between retries. - * @param database The database to write to. - * @param environment The database environment. - * @param secretKey The SecretKey to use for encryption. - * @param lockTimeoutRetryCount The number of times to retry a lock timeout. - */ - protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents, - final int batchSize, final int retries, final int connectionTimeout, - final int requestTimeout, final int delay, final Database database, - final Environment environment, final SecretKey secretKey, - final int lockTimeoutRetryCount) { - super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout); - this.database = database; - this.environment = environment; - dbCount.set(database.count()); - this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount, - lockTimeoutRetryCount); - this.worker.start(); - this.secretKey = secretKey; - this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory()); - this.lockTimeoutRetryCount = lockTimeoutRetryCount; - } - - - /** - * Returns a FlumeAvroManager. - * @param name The name of the manager. - * @param agents The agents to use. - * @param properties Properties to pass to the Manager. - * @param batchSize The number of events to include in a batch. - * @param retries The number of times to retry connecting before giving up. - * @param connectionTimeout The amount of time to wait to establish a connection. - * @param requestTimeout The amount of time to wait for a response to a request. - * @param delayMillis Amount of time to delay before delivering a batch. - * @param lockTimeoutRetryCount The number of times to retry after a lock timeout. - * @param dataDir The location of the Berkeley database. - * @return A FlumeAvroManager. - */ - public static FlumePersistentManager getManager(final String name, final Agent[] agents, - final Property[] properties, int batchSize, final int retries, - final int connectionTimeout, final int requestTimeout, - final int delayMillis, final int lockTimeoutRetryCount, - final String dataDir) { - if (agents == null || agents.length == 0) { - throw new IllegalArgumentException("At least one agent is required"); - } - - if (batchSize <= 0) { - batchSize = 1; - } - final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir; - - final StringBuilder sb = new StringBuilder("FlumePersistent["); - boolean first = true; - for (final Agent agent : agents) { - if (!first) { - sb.append(','); - } - sb.append(agent.getHost()).append(':').append(agent.getPort()); - first = false; - } - sb.append(']'); - sb.append(' ').append(dataDirectory); - return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, - connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties)); - } - - @Override - public void send(final Event event) { - if (worker.isShutdown()) { - throw new LoggingException("Unable to record event"); - } - - final Map<String, String> headers = event.getHeaders(); - final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8); - try { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final DataOutputStream daos = new DataOutputStream(baos); - daos.writeInt(event.getBody().length); - daos.write(event.getBody(), 0, event.getBody().length); - daos.writeInt(event.getHeaders().size()); - for (final Map.Entry<String, String> entry : headers.entrySet()) { - daos.writeUTF(entry.getKey()); - daos.writeUTF(entry.getValue()); - } - byte[] eventData = baos.toByteArray(); - if (secretKey != null) { - final Cipher cipher = Cipher.getInstance("AES"); - cipher.init(Cipher.ENCRYPT_MODE, secretKey); - eventData = cipher.doFinal(eventData); - } - final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database, - gate, dbCount, getBatchSize(), lockTimeoutRetryCount)); - boolean interrupted = false; - int ieCount = 0; - do { - try { - future.get(); - } catch (final InterruptedException ie) { - interrupted = true; - ++ieCount; - } - } while (interrupted && ieCount <= 1); - - } catch (final Exception ex) { - throw new LoggingException("Exception occurred writing log event", ex); - } - } - - @Override - protected void releaseSub() { - LOGGER.debug("Shutting down FlumePersistentManager"); - worker.shutdown(); - try { - worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND); - } catch (final InterruptedException ie) { - // Ignore the exception and shutdown. - } - threadPool.shutdown(); - try { - threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS); - } catch (final InterruptedException e) { - logWarn("PersistentManager Thread pool failed to shut down", e); - } - try { - worker.join(); - } catch (final InterruptedException ex) { - logDebug("interrupted while waiting for worker to complete", ex); - } - try { - LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig())); - database.close(); - } catch (final Exception ex) { - logWarn("failed to close database", ex); - } - try { - environment.cleanLog(); - environment.close(); - } catch (final Exception ex) { - logWarn("failed to close environment", ex); - } - super.releaseSub(); - } - - private void doSend(final SimpleEvent event) { - LOGGER.debug("Sending event to Flume"); - super.send(event); - } - - /** - * Thread for writing to Berkeley DB to avoid having interrupts close the database. - */ - private static class BDBWriter implements Callable<Integer> { - private final byte[] eventData; - private final byte[] keyData; - private final Environment environment; - private final Database database; - private final Gate gate; - private final AtomicLong dbCount; - private final long batchSize; - private final int lockTimeoutRetryCount; - - public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment, - final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize, - final int lockTimeoutRetryCount) { - this.keyData = keyData; - this.eventData = eventData; - this.environment = environment; - this.database = database; - this.gate = gate; - this.dbCount = dbCount; - this.batchSize = batchSize; - this.lockTimeoutRetryCount = lockTimeoutRetryCount; - } - - @Override - public Integer call() throws Exception { - final DatabaseEntry key = new DatabaseEntry(keyData); - final DatabaseEntry data = new DatabaseEntry(eventData); - Exception exception = null; - for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { - Transaction txn = null; - try { - txn = environment.beginTransaction(null, null); - try { - database.put(txn, key, data); - txn.commit(); - txn = null; - if (dbCount.incrementAndGet() >= batchSize) { - gate.open(); - } - exception = null; - break; - } catch (final LockConflictException lce) { - exception = lce; - // Fall through and retry. - } catch (final Exception ex) { - if (txn != null) { - txn.abort(); - } - throw ex; - } finally { - if (txn != null) { - txn.abort(); - txn = null; - } - } - } catch (final LockConflictException lce) { - exception = lce; - if (txn != null) { - try { - txn.abort(); - txn = null; - } catch (final Exception ex) { - LOGGER.trace("Ignoring exception while aborting transaction during lock conflict."); - } - } - - } - try { - Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); - } catch (final InterruptedException ie) { - // Ignore the error - } - } - if (exception != null) { - throw exception; - } - return eventData.length; - } - } - - /** - * Factory data. - */ - private static class FactoryData { - private final String name; - private final Agent[] agents; - private final int batchSize; - private final String dataDir; - private final int retries; - private final int connectionTimeout; - private final int requestTimeout; - private final int delayMillis; - private final int lockTimeoutRetryCount; - private final Property[] properties; - - /** - * Constructor. - * @param name The name of the Appender. - * @param agents The agents. - * @param batchSize The number of events to include in a batch. - * @param dataDir The directory for data. - */ - public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, - final int connectionTimeout, final int requestTimeout, final int delayMillis, - final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) { - this.name = name; - this.agents = agents; - this.batchSize = batchSize; - this.dataDir = dataDir; - this.retries = retries; - this.connectionTimeout = connectionTimeout; - this.requestTimeout = requestTimeout; - this.delayMillis = delayMillis; - this.lockTimeoutRetryCount = lockTimeoutRetryCount; - this.properties = properties; - } - } - - /** - * Avro Manager Factory. - */ - private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> { - - /** - * Create the FlumeKratiManager. - * @param name The name of the entity to manage. - * @param data The data required to create the entity. - * @return The FlumeKratiManager. - */ - @Override - public FlumePersistentManager createManager(final String name, final FactoryData data) { - SecretKey secretKey = null; - Database database = null; - Environment environment = null; - - final Map<String, String> properties = new HashMap<>(); - if (data.properties != null) { - for (final Property property : data.properties) { - properties.put(property.getName(), property.getValue()); - } - } - - try { - final File dir = new File(data.dataDir); - FileUtils.mkdir(dir, true); - final EnvironmentConfig dbEnvConfig = new EnvironmentConfig(); - dbEnvConfig.setTransactional(true); - dbEnvConfig.setAllowCreate(true); - dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS); - environment = new Environment(dir, dbEnvConfig); - final DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - database = environment.openDatabase(null, name, dbConfig); - } catch (final Exception ex) { - LOGGER.error("Could not create FlumePersistentManager", ex); - // For consistency, close database as well as environment even though it should never happen since the - // database is that last thing in the block above, but this does guard against a future line being - // inserted at the end that would bomb (like some debug logging). - if (database != null) { - database.close(); - database = null; - } - if (environment != null) { - environment.close(); - environment = null; - } - return null; - } - - try { - String key = null; - for (final Map.Entry<String, String> entry : properties.entrySet()) { - if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) { - key = entry.getValue(); - break; - } - } - if (key != null) { - final PluginManager manager = new PluginManager("KeyProvider"); - manager.collectPlugins(); - final Map<String, PluginType<?>> plugins = manager.getPlugins(); - if (plugins != null) { - boolean found = false; - for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) { - if (entry.getKey().equalsIgnoreCase(key)) { - found = true; - final Class<?> cl = entry.getValue().getPluginClass(); - try { - final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance(); - secretKey = provider.getSecretKey(); - LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName()); - } catch (final Exception ex) { - LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled", - cl.getName()); - } - break; - } - } - if (!found) { - LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); - } - } else { - LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); - } - } - } catch (final Exception ex) { - LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); - } - return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, - data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey, - data.lockTimeoutRetryCount); - } - } - - /** - * Thread that sends data to Flume and pulls it from Berkeley DB. - */ - private static class WriterThread extends Thread { - private volatile boolean shutdown = false; - private final Database database; - private final Environment environment; - private final FlumePersistentManager manager; - private final Gate gate; - private final SecretKey secretKey; - private final int batchSize; - private final AtomicLong dbCounter; - private final int lockTimeoutRetryCount; - - public WriterThread(final Database database, final Environment environment, - final FlumePersistentManager manager, final Gate gate, final int batchsize, - final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) { - this.database = database; - this.environment = environment; - this.manager = manager; - this.gate = gate; - this.batchSize = batchsize; - this.secretKey = secretKey; - this.setDaemon(true); - this.dbCounter = dbCount; - this.lockTimeoutRetryCount = lockTimeoutRetryCount; - } - - public void shutdown() { - LOGGER.debug("Writer thread shutting down"); - this.shutdown = true; - gate.open(); - } - - public boolean isShutdown() { - return shutdown; - } - - @Override - public void run() { - LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis()); - long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis(); - while (!shutdown) { - final long nowMillis = System.currentTimeMillis(); - final long dbCount = database.count(); - dbCounter.set(dbCount); - if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) { - nextBatchMillis = nowMillis + manager.getDelayMillis(); - try { - boolean errors = false; - final DatabaseEntry key = new DatabaseEntry(); - final DatabaseEntry data = new DatabaseEntry(); - - gate.close(); - OperationStatus status; - if (batchSize > 1) { - try { - errors = sendBatch(key, data); - } catch (final Exception ex) { - break; - } - } else { - Exception exception = null; - for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { - exception = null; - Transaction txn = null; - Cursor cursor = null; - try { - txn = environment.beginTransaction(null, null); - cursor = database.openCursor(txn, null); - try { - status = cursor.getFirst(key, data, LockMode.RMW); - while (status == OperationStatus.SUCCESS) { - final SimpleEvent event = createEvent(data); - if (event != null) { - try { - manager.doSend(event); - } catch (final Exception ioe) { - errors = true; - LOGGER.error("Error sending event", ioe); - break; - } - try { - cursor.delete(); - } catch (final Exception ex) { - LOGGER.error("Unable to delete event", ex); - } - } - status = cursor.getNext(key, data, LockMode.RMW); - } - if (cursor != null) { - cursor.close(); - cursor = null; - } - txn.commit(); - txn = null; - dbCounter.decrementAndGet(); - exception = null; - break; - } catch (final LockConflictException lce) { - exception = lce; - // Fall through and retry. - } catch (final Exception ex) { - LOGGER.error("Error reading or writing to database", ex); - shutdown = true; - break; - } finally { - if (cursor != null) { - cursor.close(); - cursor = null; - } - if (txn != null) { - txn.abort(); - txn = null; - } - } - } catch (final LockConflictException lce) { - exception = lce; - if (cursor != null) { - try { - cursor.close(); - cursor = null; - } catch (final Exception ex) { - LOGGER.trace("Ignored exception closing cursor during lock conflict."); - } - } - if (txn != null) { - try { - txn.abort(); - txn = null; - } catch (final Exception ex) { - LOGGER.trace("Ignored exception aborting tx during lock conflict."); - } - } - } - try { - Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); - } catch (final InterruptedException ie) { - // Ignore the error - } - } - if (exception != null) { - LOGGER.error("Unable to read or update data base", exception); - } - } - if (errors) { - Thread.sleep(manager.getDelayMillis()); - continue; - } - } catch (final Exception ex) { - LOGGER.warn("WriterThread encountered an exception. Continuing.", ex); - } - } else { - if (nextBatchMillis <= nowMillis) { - nextBatchMillis = nowMillis + manager.getDelayMillis(); - } - try { - final long interval = nextBatchMillis - nowMillis; - gate.waitForOpen(interval); - } catch (final InterruptedException ie) { - LOGGER.warn("WriterThread interrupted, continuing"); - } catch (final Exception ex) { - LOGGER.error("WriterThread encountered an exception waiting for work", ex); - break; - } - } - } - - if (batchSize > 1 && database.count() > 0) { - final DatabaseEntry key = new DatabaseEntry(); - final DatabaseEntry data = new DatabaseEntry(); - try { - sendBatch(key, data); - } catch (final Exception ex) { - LOGGER.warn("Unable to write final batch"); - } - } - LOGGER.trace("WriterThread exiting"); - } - - private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception { - boolean errors = false; - OperationStatus status; - Cursor cursor = null; - try { - final BatchEvent batch = new BatchEvent(); - for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { - try { - cursor = database.openCursor(null, CursorConfig.DEFAULT); - status = cursor.getFirst(key, data, null); - - for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) { - final SimpleEvent event = createEvent(data); - if (event != null) { - batch.addEvent(event); - } - status = cursor.getNext(key, data, null); - } - break; - } catch (final LockConflictException lce) { - if (cursor != null) { - try { - cursor.close(); - cursor = null; - } catch (final Exception ex) { - LOGGER.trace("Ignored exception closing cursor during lock conflict."); - } - } - } - } - - try { - manager.send(batch); - } catch (final Exception ioe) { - LOGGER.error("Error sending events", ioe); - errors = true; - } - if (!errors) { - if (cursor != null) { - cursor.close(); - cursor = null; - } - Transaction txn = null; - Exception exception = null; - for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { - try { - txn = environment.beginTransaction(null, null); - try { - for (final Event event : batch.getEvents()) { - try { - final Map<String, String> headers = event.getHeaders(); - key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8)); - database.delete(txn, key); - } catch (final Exception ex) { - LOGGER.error("Error deleting key from database", ex); - } - } - txn.commit(); - long count = dbCounter.get(); - while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) { - count = dbCounter.get(); - } - exception = null; - break; - } catch (final LockConflictException lce) { - exception = lce; - if (cursor != null) { - try { - cursor.close(); - cursor = null; - } catch (final Exception ex) { - LOGGER.trace("Ignored exception closing cursor during lock conflict."); - } - } - if (txn != null) { - try { - txn.abort(); - txn = null; - } catch (final Exception ex) { - LOGGER.trace("Ignored exception aborting transaction during lock conflict."); - } - } - } catch (final Exception ex) { - LOGGER.error("Unable to commit transaction", ex); - if (txn != null) { - txn.abort(); - } - } - } catch (final LockConflictException lce) { - exception = lce; - if (cursor != null) { - try { - cursor.close(); - cursor = null; - } catch (final Exception ex) { - LOGGER.trace("Ignored exception closing cursor during lock conflict."); - } - } - if (txn != null) { - try { - txn.abort(); - txn = null; - } catch (final Exception ex) { - LOGGER.trace("Ignored exception aborting transaction during lock conflict."); - } - } - } finally { - if (cursor != null) { - cursor.close(); - cursor = null; - } - if (txn != null) { - txn.abort(); - txn = null; - } - } - try { - Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); - } catch (final InterruptedException ie) { - // Ignore the error - } - } - if (exception != null) { - LOGGER.error("Unable to delete events from data base", exception); - } - } - } catch (final Exception ex) { - LOGGER.error("Error reading database", ex); - shutdown = true; - throw ex; - } finally { - if (cursor != null) { - cursor.close(); - } - } - - return errors; - } - - private SimpleEvent createEvent(final DatabaseEntry data) { - final SimpleEvent event = new SimpleEvent(); - try { - byte[] eventData = data.getData(); - if (secretKey != null) { - final Cipher cipher = Cipher.getInstance("AES"); - cipher.init(Cipher.DECRYPT_MODE, secretKey); - eventData = cipher.doFinal(eventData); - } - final ByteArrayInputStream bais = new ByteArrayInputStream(eventData); - final DataInputStream dais = new DataInputStream(bais); - int length = dais.readInt(); - final byte[] bytes = new byte[length]; - dais.read(bytes, 0, length); - event.setBody(bytes); - length = dais.readInt(); - final Map<String, String> map = new HashMap<>(length); - for (int i = 0; i < length; ++i) { - final String headerKey = dais.readUTF(); - final String value = dais.readUTF(); - map.put(headerKey, value); - } - event.setHeaders(map); - return event; - } catch (final Exception ex) { - LOGGER.error("Error retrieving event", ex); - return null; - } - } - - } - - /** - * Factory that creates Daemon threads that can be properly shut down. - */ - private static class DaemonThreadFactory implements ThreadFactory { - private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); - private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final String namePrefix; - - public DaemonThreadFactory() { - final SecurityManager securityManager = System.getSecurityManager(); - group = securityManager != null ? securityManager.getThreadGroup() : - Thread.currentThread().getThreadGroup(); - namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-"; - } - - @Override - public Thread newThread(final Runnable r) { - final Thread thread = new Log4jThread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); - thread.setDaemon(true); - if (thread.getPriority() != Thread.NORM_PRIORITY) { - thread.setPriority(Thread.NORM_PRIORITY); - } - return thread; - } - } - - /** - * An internal class. - */ - private static class Gate { - - private boolean isOpen = false; - - public boolean isOpen() { - return isOpen; - } - - public synchronized void open() { - isOpen = true; - notifyAll(); - } - - public synchronized void close() { - isOpen = false; - } - - public synchronized void waitForOpen(final long timeout) throws InterruptedException { - wait(timeout); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.flume.appender; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.crypto.Cipher; +import javax.crypto.SecretKey; + +import org.apache.flume.Event; +import org.apache.flume.event.SimpleEvent; +import org.apache.logging.log4j.LoggingException; +import org.apache.logging.log4j.core.appender.ManagerFactory; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.config.plugins.util.PluginManager; +import org.apache.logging.log4j.core.config.plugins.util.PluginType; +import org.apache.logging.log4j.core.util.FileUtils; +import org.apache.logging.log4j.core.util.Log4jThread; +import org.apache.logging.log4j.core.util.SecretKeyProvider; +import org.apache.logging.log4j.util.Strings; + +import com.sleepycat.je.Cursor; +import com.sleepycat.je.CursorConfig; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.LockConflictException; +import com.sleepycat.je.LockMode; +import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.StatsConfig; +import com.sleepycat.je.Transaction; + +/** + * Manager that persists data to Berkeley DB before passing it on to Flume. + */ +public class FlumePersistentManager extends FlumeAvroManager { + + /** Attribute name for the key provider. */ + public static final String KEY_PROVIDER = "keyProvider"; + + private static final Charset UTF8 = StandardCharsets.UTF_8; + + private static final String DEFAULT_DATA_DIR = ".log4j/flumeData"; + + private static final int SHUTDOWN_WAIT = 60; + + private static final int MILLIS_PER_SECOND = 1000; + + private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500; + + private static BDBManagerFactory factory = new BDBManagerFactory(); + + private final Database database; + + private final Environment environment; + + private final WriterThread worker; + + private final Gate gate = new Gate(); + + private final SecretKey secretKey; + + private final int lockTimeoutRetryCount; + + private final ExecutorService threadPool; + + private final AtomicLong dbCount = new AtomicLong(); + + /** + * Constructor + * @param name The unique name of this manager. + * @param shortName Original name for the Manager. + * @param agents An array of Agents. + * @param batchSize The number of events to include in a batch. + * @param retries The number of times to retry connecting before giving up. + * @param connectionTimeout The amount of time to wait for a connection to be established. + * @param requestTimeout The amount of time to wair for a response to a request. + * @param delay The amount of time to wait between retries. + * @param database The database to write to. + * @param environment The database environment. + * @param secretKey The SecretKey to use for encryption. + * @param lockTimeoutRetryCount The number of times to retry a lock timeout. + */ + protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents, + final int batchSize, final int retries, final int connectionTimeout, + final int requestTimeout, final int delay, final Database database, + final Environment environment, final SecretKey secretKey, + final int lockTimeoutRetryCount) { + super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout); + this.database = database; + this.environment = environment; + dbCount.set(database.count()); + this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount, + lockTimeoutRetryCount); + this.worker.start(); + this.secretKey = secretKey; + this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory()); + this.lockTimeoutRetryCount = lockTimeoutRetryCount; + } + + + /** + * Returns a FlumeAvroManager. + * @param name The name of the manager. + * @param agents The agents to use. + * @param properties Properties to pass to the Manager. + * @param batchSize The number of events to include in a batch. + * @param retries The number of times to retry connecting before giving up. + * @param connectionTimeout The amount of time to wait to establish a connection. + * @param requestTimeout The amount of time to wait for a response to a request. + * @param delayMillis Amount of time to delay before delivering a batch. + * @param lockTimeoutRetryCount The number of times to retry after a lock timeout. + * @param dataDir The location of the Berkeley database. + * @return A FlumeAvroManager. + */ + public static FlumePersistentManager getManager(final String name, final Agent[] agents, + final Property[] properties, int batchSize, final int retries, + final int connectionTimeout, final int requestTimeout, + final int delayMillis, final int lockTimeoutRetryCount, + final String dataDir) { + if (agents == null || agents.length == 0) { + throw new IllegalArgumentException("At least one agent is required"); + } + + if (batchSize <= 0) { + batchSize = 1; + } + final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir; + + final StringBuilder sb = new StringBuilder("FlumePersistent["); + boolean first = true; + for (final Agent agent : agents) { + if (!first) { + sb.append(','); + } + sb.append(agent.getHost()).append(':').append(agent.getPort()); + first = false; + } + sb.append(']'); + sb.append(' ').append(dataDirectory); + return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, + connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties)); + } + + @Override + public void send(final Event event) { + if (worker.isShutdown()) { + throw new LoggingException("Unable to record event"); + } + + final Map<String, String> headers = event.getHeaders(); + final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8); + try { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream daos = new DataOutputStream(baos); + daos.writeInt(event.getBody().length); + daos.write(event.getBody(), 0, event.getBody().length); + daos.writeInt(event.getHeaders().size()); + for (final Map.Entry<String, String> entry : headers.entrySet()) { + daos.writeUTF(entry.getKey()); + daos.writeUTF(entry.getValue()); + } + byte[] eventData = baos.toByteArray(); + if (secretKey != null) { + final Cipher cipher = Cipher.getInstance("AES"); + cipher.init(Cipher.ENCRYPT_MODE, secretKey); + eventData = cipher.doFinal(eventData); + } + final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database, + gate, dbCount, getBatchSize(), lockTimeoutRetryCount)); + boolean interrupted = false; + int ieCount = 0; + do { + try { + future.get(); + } catch (final InterruptedException ie) { + interrupted = true; + ++ieCount; + } + } while (interrupted && ieCount <= 1); + + } catch (final Exception ex) { + throw new LoggingException("Exception occurred writing log event", ex); + } + } + + @Override + protected void releaseSub() { + LOGGER.debug("Shutting down FlumePersistentManager"); + worker.shutdown(); + try { + worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND); + } catch (final InterruptedException ie) { + // Ignore the exception and shutdown. + } + threadPool.shutdown(); + try { + threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + logWarn("PersistentManager Thread pool failed to shut down", e); + } + try { + worker.join(); + } catch (final InterruptedException ex) { + logDebug("interrupted while waiting for worker to complete", ex); + } + try { + LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig())); + database.close(); + } catch (final Exception ex) { + logWarn("Failed to close database", ex); + } + try { + environment.cleanLog(); + environment.close(); + } catch (final Exception ex) { + logWarn("Failed to close environment", ex); + } + super.releaseSub(); + } + + private void doSend(final SimpleEvent event) { + LOGGER.debug("Sending event to Flume"); + super.send(event); + } + + /** + * Thread for writing to Berkeley DB to avoid having interrupts close the database. + */ + private static class BDBWriter implements Callable<Integer> { + private final byte[] eventData; + private final byte[] keyData; + private final Environment environment; + private final Database database; + private final Gate gate; + private final AtomicLong dbCount; + private final long batchSize; + private final int lockTimeoutRetryCount; + + public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment, + final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize, + final int lockTimeoutRetryCount) { + this.keyData = keyData; + this.eventData = eventData; + this.environment = environment; + this.database = database; + this.gate = gate; + this.dbCount = dbCount; + this.batchSize = batchSize; + this.lockTimeoutRetryCount = lockTimeoutRetryCount; + } + + @Override + public Integer call() throws Exception { + final DatabaseEntry key = new DatabaseEntry(keyData); + final DatabaseEntry data = new DatabaseEntry(eventData); + Exception exception = null; + for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { + Transaction txn = null; + try { + txn = environment.beginTransaction(null, null); + try { + database.put(txn, key, data); + txn.commit(); + txn = null; + if (dbCount.incrementAndGet() >= batchSize) { + gate.open(); + } + exception = null; + break; + } catch (final LockConflictException lce) { + exception = lce; + // Fall through and retry. + } catch (final Exception ex) { + if (txn != null) { + txn.abort(); + } + throw ex; + } finally { + if (txn != null) { + txn.abort(); + txn = null; + } + } + } catch (final LockConflictException lce) { + exception = lce; + if (txn != null) { + try { + txn.abort(); + txn = null; + } catch (final Exception ex) { + LOGGER.trace("Ignoring exception while aborting transaction during lock conflict."); + } + } + + } + try { + Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); + } catch (final InterruptedException ie) { + // Ignore the error + } + } + if (exception != null) { + throw exception; + } + return eventData.length; + } + } + + /** + * Factory data. + */ + private static class FactoryData { + private final String name; + private final Agent[] agents; + private final int batchSize; + private final String dataDir; + private final int retries; + private final int connectionTimeout; + private final int requestTimeout; + private final int delayMillis; + private final int lockTimeoutRetryCount; + private final Property[] properties; + + /** + * Constructor. + * @param name The name of the Appender. + * @param agents The agents. + * @param batchSize The number of events to include in a batch. + * @param dataDir The directory for data. + */ + public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, + final int connectionTimeout, final int requestTimeout, final int delayMillis, + final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) { + this.name = name; + this.agents = agents; + this.batchSize = batchSize; + this.dataDir = dataDir; + this.retries = retries; + this.connectionTimeout = connectionTimeout; + this.requestTimeout = requestTimeout; + this.delayMillis = delayMillis; + this.lockTimeoutRetryCount = lockTimeoutRetryCount; + this.properties = properties; + } + } + + /** + * Avro Manager Factory. + */ + private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> { + + /** + * Create the FlumeKratiManager. + * @param name The name of the entity to manage. + * @param data The data required to create the entity. + * @return The FlumeKratiManager. + */ + @Override + public FlumePersistentManager createManager(final String name, final FactoryData data) { + SecretKey secretKey = null; + Database database = null; + Environment environment = null; + + final Map<String, String> properties = new HashMap<>(); + if (data.properties != null) { + for (final Property property : data.properties) { + properties.put(property.getName(), property.getValue()); + } + } + + try { + final File dir = new File(data.dataDir); + FileUtils.mkdir(dir, true); + final EnvironmentConfig dbEnvConfig = new EnvironmentConfig(); + dbEnvConfig.setTransactional(true); + dbEnvConfig.setAllowCreate(true); + dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS); + environment = new Environment(dir, dbEnvConfig); + final DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + database = environment.openDatabase(null, name, dbConfig); + } catch (final Exception ex) { + LOGGER.error("Could not create FlumePersistentManager", ex); + // For consistency, close database as well as environment even though it should never happen since the + // database is that last thing in the block above, but this does guard against a future line being + // inserted at the end that would bomb (like some debug logging). + if (database != null) { + database.close(); + database = null; + } + if (environment != null) { + environment.close(); + environment = null; + } + return null; + } + + try { + String key = null; + for (final Map.Entry<String, String> entry : properties.entrySet()) { + if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) { + key = entry.getValue(); + break; + } + } + if (key != null) { + final PluginManager manager = new PluginManager("KeyProvider"); + manager.collectPlugins(); + final Map<String, PluginType<?>> plugins = manager.getPlugins(); + if (plugins != null) { + boolean found = false; + for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) { + if (entry.getKey().equalsIgnoreCase(key)) { + found = true; + final Class<?> cl = entry.getValue().getPluginClass(); + try { + final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance(); + secretKey = provider.getSecretKey(); + LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName()); + } catch (final Exception ex) { + LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled", + cl.getName()); + } + break; + } + } + if (!found) { + LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); + } + } else { + LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); + } + } + } catch (final Exception ex) { + LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); + } + return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, + data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey, + data.lockTimeoutRetryCount); + } + } + + /** + * Thread that sends data to Flume and pulls it from Berkeley DB. + */ + private static class WriterThread extends Thread { + private volatile boolean shutdown = false; + private final Database database; + private final Environment environment; + private final FlumePersistentManager manager; + private final Gate gate; + private final SecretKey secretKey; + private final int batchSize; + private final AtomicLong dbCounter; + private final int lockTimeoutRetryCount; + + public WriterThread(final Database database, final Environment environment, + final FlumePersistentManager manager, final Gate gate, final int batchsize, + final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) { + this.database = database; + this.environment = environment; + this.manager = manager; + this.gate = gate; + this.batchSize = batchsize; + this.secretKey = secretKey; + this.setDaemon(true); + this.dbCounter = dbCount; + this.lockTimeoutRetryCount = lockTimeoutRetryCount; + } + + public void shutdown() { + LOGGER.debug("Writer thread shutting down"); + this.shutdown = true; + gate.open(); + } + + public boolean isShutdown() { + return shutdown; + } + + @Override + public void run() { + LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis()); + long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis(); + while (!shutdown) { + final long nowMillis = System.currentTimeMillis(); + final long dbCount = database.count(); + dbCounter.set(dbCount); + if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) { + nextBatchMillis = nowMillis + manager.getDelayMillis(); + try { + boolean errors = false; + final DatabaseEntry key = new DatabaseEntry(); + final DatabaseEntry data = new DatabaseEntry(); + + gate.close(); + OperationStatus status; + if (batchSize > 1) { + try { + errors = sendBatch(key, data); + } catch (final Exception ex) { + break; + } + } else { + Exception exception = null; + for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { + exception = null; + Transaction txn = null; + Cursor cursor = null; + try { + txn = environment.beginTransaction(null, null); + cursor = database.openCursor(txn, null); + try { + status = cursor.getFirst(key, data, LockMode.RMW); + while (status == OperationStatus.SUCCESS) { + final SimpleEvent event = createEvent(data); + if (event != null) { + try { + manager.doSend(event); + } catch (final Exception ioe) { + errors = true; + LOGGER.error("Error sending event", ioe); + break; + } + try { + cursor.delete(); + } catch (final Exception ex) { + LOGGER.error("Unable to delete event", ex); + } + } + status = cursor.getNext(key, data, LockMode.RMW); + } + if (cursor != null) { + cursor.close(); + cursor = null; + } + txn.commit(); + txn = null; + dbCounter.decrementAndGet(); + exception = null; + break; + } catch (final LockConflictException lce) { + exception = lce; + // Fall through and retry. + } catch (final Exception ex) { + LOGGER.error("Error reading or writing to database", ex); + shutdown = true; + break; + } finally { + if (cursor != null) { + cursor.close(); + cursor = null; + } + if (txn != null) { + txn.abort(); + txn = null; + } + } + } catch (final LockConflictException lce) { + exception = lce; + if (cursor != null) { + try { + cursor.close(); + cursor = null; + } catch (final Exception ex) { + LOGGER.trace("Ignored exception closing cursor during lock conflict."); + } + } + if (txn != null) { + try { + txn.abort(); + txn = null; + } catch (final Exception ex) { + LOGGER.trace("Ignored exception aborting tx during lock conflict."); + } + } + } + try { + Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); + } catch (final InterruptedException ie) { + // Ignore the error + } + } + if (exception != null) { + LOGGER.error("Unable to read or update data base", exception); + } + } + if (errors) { + Thread.sleep(manager.getDelayMillis()); + continue; + } + } catch (final Exception ex) { + LOGGER.warn("WriterThread encountered an exception. Continuing.", ex); + } + } else { + if (nextBatchMillis <= nowMillis) { + nextBatchMillis = nowMillis + manager.getDelayMillis(); + } + try { + final long interval = nextBatchMillis - nowMillis; + gate.waitForOpen(interval); + } catch (final InterruptedException ie) { + LOGGER.warn("WriterThread interrupted, continuing"); + } catch (final Exception ex) { + LOGGER.error("WriterThread encountered an exception waiting for work", ex); + break; + } + } + } + + if (batchSize > 1 && database.count() > 0) { + final DatabaseEntry key = new DatabaseEntry(); + final DatabaseEntry data = new DatabaseEntry(); + try { + sendBatch(key, data); + } catch (final Exception ex) { + LOGGER.warn("Unable to write final batch"); + } + } + LOGGER.trace("WriterThread exiting"); + } + + private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception { + boolean errors = false; + OperationStatus status; + Cursor cursor = null; + try { + final BatchEvent batch = new BatchEvent(); + for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { + try { + cursor = database.openCursor(null, CursorConfig.DEFAULT); + status = cursor.getFirst(key, data, null); + + for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) { + final SimpleEvent event = createEvent(data); + if (event != null) { + batch.addEvent(event); + } + status = cursor.getNext(key, data, null); + } + break; + } catch (final LockConflictException lce) { + if (cursor != null) { + try { + cursor.close(); + cursor = null; + } catch (final Exception ex) { + LOGGER.trace("Ignored exception closing cursor during lock conflict."); + } + } + } + } + + try { + manager.send(batch); + } catch (final Exception ioe) { + LOGGER.error("Error sending events", ioe); + errors = true; + } + if (!errors) { + if (cursor != null) { + cursor.close(); + cursor = null; + } + Transaction txn = null; + Exception exception = null; + for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { + try { + txn = environment.beginTransaction(null, null); + try { + for (final Event event : batch.getEvents()) { + try { + final Map<String, String> headers = event.getHeaders(); + key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8)); + database.delete(txn, key); + } catch (final Exception ex) { + LOGGER.error("Error deleting key from database", ex); + } + } + txn.commit(); + long count = dbCounter.get(); + while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) { + count = dbCounter.get(); + } + exception = null; + break; + } catch (final LockConflictException lce) { + exception = lce; + if (cursor != null) { + try { + cursor.close(); + cursor = null; + } catch (final Exception ex) { + LOGGER.trace("Ignored exception closing cursor during lock conflict."); + } + } + if (txn != null) { + try { + txn.abort(); + txn = null; + } catch (final Exception ex) { + LOGGER.trace("Ignored exception aborting transaction during lock conflict."); + } + } + } catch (final Exception ex) { + LOGGER.error("Unable to commit transaction", ex); + if (txn != null) { + txn.abort(); + } + } + } catch (final LockConflictException lce) { + exception = lce; + if (cursor != null) { + try { + cursor.close(); + cursor = null; + } catch (final Exception ex) { + LOGGER.trace("Ignored exception closing cursor during lock conflict."); + } + } + if (txn != null) { + try { + txn.abort(); + txn = null; + } catch (final Exception ex) { + LOGGER.trace("Ignored exception aborting transaction during lock conflict."); + } + } + } finally { + if (cursor != null) { + cursor.close(); + cursor = null; + } + if (txn != null) { + txn.abort(); + txn = null; + } + } + try { + Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); + } catch (final InterruptedException ie) { + // Ignore the error + } + } + if (exception != null) { + LOGGER.error("Unable to delete events from data base", exception); + } + } + } catch (final Exception ex) { + LOGGER.error("Error reading database", ex); + shutdown = true; + throw ex; + } finally { + if (cursor != null) { + cursor.close(); + } + } + + return errors; + } + + private SimpleEvent createEvent(final DatabaseEntry data) { + final SimpleEvent event = new SimpleEvent(); + try { + byte[] eventData = data.getData(); + if (secretKey != null) { + final Cipher cipher = Cipher.getInstance("AES"); + cipher.init(Cipher.DECRYPT_MODE, secretKey); + eventData = cipher.doFinal(eventData); + } + final ByteArrayInputStream bais = new ByteArrayInputStream(eventData); + final DataInputStream dais = new DataInputStream(bais); + int length = dais.readInt(); + final byte[] bytes = new byte[length]; + dais.read(bytes, 0, length); + event.setBody(bytes); + length = dais.readInt(); + final Map<String, String> map = new HashMap<>(length); + for (int i = 0; i < length; ++i) { + final String headerKey = dais.readUTF(); + final String value = dais.readUTF(); + map.put(headerKey, value); + } + event.setHeaders(map); + return event; + } catch (final Exception ex) { + LOGGER.error("Error retrieving event", ex); + return null; + } + } + + } + + /** + * Factory that creates Daemon threads that can be properly shut down. + */ + private static class DaemonThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + public DaemonThreadFactory() { + final SecurityManager securityManager = System.getSecurityManager(); + group = securityManager != null ? securityManager.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-"; + } + + @Override + public Thread newThread(final Runnable r) { + final Thread thread = new Log4jThread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); + thread.setDaemon(true); + if (thread.getPriority() != Thread.NORM_PRIORITY) { + thread.setPriority(Thread.NORM_PRIORITY); + } + return thread; + } + } + + /** + * An internal class. + */ + private static class Gate { + + private boolean isOpen = false; + + public boolean isOpen() { + return isOpen; + } + + public synchronized void open() { + isOpen = true; + notifyAll(); + } + + public synchronized void close() { + isOpen = false; + } + + public synchronized void waitForOpen(final long timeout) throws InterruptedException { + wait(timeout); + } + } +}