Repository: logging-log4j2 Updated Branches: refs/heads/master d37276d47 -> 3cdccd03e
[LOG4J2-867] FlumeAppender: maxDelay not in seconds, but milliseconds. Rename internal ivars and pnames from delay to delayMillis. Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/3cdccd03 Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/3cdccd03 Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/3cdccd03 Branch: refs/heads/master Commit: 3cdccd03eccec6b274c7e554f403f27c7e1e79a6 Parents: d37276d Author: Gary Gregory <garydgreg...@gmail.com> Authored: Fri Oct 3 18:32:43 2014 -0400 Committer: Gary Gregory <garydgreg...@gmail.com> Committed: Fri Oct 3 18:32:43 2014 -0400 ---------------------------------------------------------------------- .../log4j/flume/appender/FlumeAppender.java | 8 ++--- .../flume/appender/FlumePersistentManager.java | 36 ++++++++++---------- 2 files changed, 22 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3cdccd03/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java ---------------------------------------------------------------------- diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java index a15e327..336a07f 100644 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java +++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java @@ -142,7 +142,7 @@ public final class FlumeAppender extends AbstractAppender implements FlumeEventF * 1000. * @param requestTimeout The amount of time in milliseconds to wait before a request times out. Minimum is 1000. * @param agentRetries The number of times to retry an agent before failing to the next agent. - * @param maxDelay The maximum number of seconds to wait for a complete batch. + * @param maxDelayMillis The maximum number of seconds to wait for a complete batch. * @param name The name of the Appender. * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise * they are propagated to the caller. @@ -169,7 +169,7 @@ public final class FlumeAppender extends AbstractAppender implements FlumeEventF @PluginAttribute("connectTimeout") final String connectionTimeout, @PluginAttribute("requestTimeout") final String requestTimeout, @PluginAttribute("agentRetries") final String agentRetries, - @PluginAttribute("maxDelay") final String maxDelay, + @PluginAttribute("maxDelay") final String maxDelayMillis, @PluginAttribute("name") final String name, @PluginAttribute("ignoreExceptions") final String ignore, @PluginAttribute("mdcExcludes") final String excludes, @@ -218,7 +218,7 @@ public final class FlumeAppender extends AbstractAppender implements FlumeEventF final int reqTimeout = Integers.parseInt(requestTimeout, 0); final int retries = Integers.parseInt(agentRetries, 0); final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT); - final int delay = Integers.parseInt(maxDelay, DEFAULT_MAX_DELAY); + final int delayMillis = Integers.parseInt(maxDelayMillis, DEFAULT_MAX_DELAY); if (layout == null) { final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER; @@ -251,7 +251,7 @@ public final class FlumeAppender extends AbstractAppender implements FlumeEventF agents = new Agent[] {Agent.createAgent(null, null)}; } manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries, - connectTimeout, reqTimeout, delay, lockTimeoutRetryCount, dataDir); + connectTimeout, reqTimeout, delayMillis, lockTimeoutRetryCount, dataDir); break; default: LOGGER.debug("No manager type specified. Defaulting to AVRO"); http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3cdccd03/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java ---------------------------------------------------------------------- diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java index 237041f..6256f12 100644 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java +++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java @@ -90,7 +90,7 @@ public class FlumePersistentManager extends FlumeAvroManager { private final SecretKey secretKey; - private final int delay; + private final int delayMillis; private final int lockTimeoutRetryCount; @@ -119,7 +119,7 @@ public class FlumePersistentManager extends FlumeAvroManager { final Environment environment, final SecretKey secretKey, final int lockTimeoutRetryCount) { super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout); - this.delay = delay; + this.delayMillis = delay; this.database = database; this.environment = environment; dbCount.set(database.count()); @@ -141,7 +141,7 @@ public class FlumePersistentManager extends FlumeAvroManager { * @param retries The number of times to retry connecting before giving up. * @param connectionTimeout The amount of time to wait to establish a connection. * @param requestTimeout The amount of time to wait for a response to a request. - * @param delay Amount of time to delay before delivering a batch. + * @param delayMillis Amount of time to delay before delivering a batch. * @param lockTimeoutRetryCount The number of times to retry after a lock timeout. * @param dataDir The location of the Berkeley database. * @return A FlumeAvroManager. @@ -149,7 +149,7 @@ public class FlumePersistentManager extends FlumeAvroManager { public static FlumePersistentManager getManager(final String name, final Agent[] agents, final Property[] properties, int batchSize, final int retries, final int connectionTimeout, final int requestTimeout, - final int delay, final int lockTimeoutRetryCount, + final int delayMillis, final int lockTimeoutRetryCount, final String dataDir) { if (agents == null || agents.length == 0) { throw new IllegalArgumentException("At least one agent is required"); @@ -172,7 +172,7 @@ public class FlumePersistentManager extends FlumeAvroManager { sb.append(']'); sb.append(' ').append(dataDirectory); return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, - connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties)); + connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties)); } @Override @@ -351,7 +351,7 @@ public class FlumePersistentManager extends FlumeAvroManager { private final int retries; private final int connectionTimeout; private final int requestTimeout; - private final int delay; + private final int delayMillis; private final int lockTimeoutRetryCount; private final Property[] properties; @@ -363,7 +363,7 @@ public class FlumePersistentManager extends FlumeAvroManager { * @param dataDir The directory for data. */ public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, - final int connectionTimeout, final int requestTimeout, final int delay, + final int connectionTimeout, final int requestTimeout, final int delayMillis, final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) { this.name = name; this.agents = agents; @@ -372,7 +372,7 @@ public class FlumePersistentManager extends FlumeAvroManager { this.retries = retries; this.connectionTimeout = connectionTimeout; this.requestTimeout = requestTimeout; - this.delay = delay; + this.delayMillis = delayMillis; this.lockTimeoutRetryCount = lockTimeoutRetryCount; this.properties = properties; } @@ -470,7 +470,7 @@ public class FlumePersistentManager extends FlumeAvroManager { LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); } return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, - data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey, + data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey, data.lockTimeoutRetryCount); } } @@ -515,14 +515,14 @@ public class FlumePersistentManager extends FlumeAvroManager { @Override public void run() { - LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay); - long nextBatch = System.currentTimeMillis() + manager.delay; + LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.delayMillis); + long nextBatchMillis = System.currentTimeMillis() + manager.delayMillis; while (!shutdown) { - final long now = System.currentTimeMillis(); + final long nowMillis = System.currentTimeMillis(); final long dbCount = database.count(); dbCounter.set(dbCount); - if (dbCount >= batchSize || dbCount > 0 && nextBatch <= now) { - nextBatch = now + manager.delay; + if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) { + nextBatchMillis = nowMillis + manager.delayMillis; try { boolean errors = false; final DatabaseEntry key = new DatabaseEntry(); @@ -621,18 +621,18 @@ public class FlumePersistentManager extends FlumeAvroManager { } } if (errors) { - Thread.sleep(manager.delay); + Thread.sleep(manager.delayMillis); continue; } } catch (final Exception ex) { LOGGER.warn("WriterThread encountered an exception. Continuing.", ex); } } else { - if (nextBatch <= now) { - nextBatch = now + manager.delay; + if (nextBatchMillis <= nowMillis) { + nextBatchMillis = nowMillis + manager.delayMillis; } try { - final long interval = nextBatch - now; + final long interval = nextBatchMillis - nowMillis; gate.waitForOpen(interval); } catch (final InterruptedException ie) { LOGGER.warn("WriterThread interrupted, continuing");