Repository: flume
Updated Branches:
  refs/heads/trunk a76e2e9f2 -> 20338fc67


FLUME-3227 Add Rate Limiter to stresssource

Currently the StressSource just runs flat out (on modern hardware
can generate 20M events per second with no problems),
relying on backpressure from the channel to regulate itself.

Adding a shaded version of the guava rate limiter and includes unit tests.

This closes #203

Reviewers: Ferenc Szabo, Endre Major, Peter Turcsanyi

(Tristan Stevens via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/20338fc6
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/20338fc6
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/20338fc6

Branch: refs/heads/trunk
Commit: 20338fc6710487fda541810cb717eaf4482c4e7c
Parents: a76e2e9
Author: Tristan Stevens <[email protected]>
Authored: Fri Aug 17 13:43:01 2018 +0200
Committer: Ferenc Szabo <[email protected]>
Committed: Fri Aug 17 13:43:01 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flume/source/StressSource.java   |  18 +
 .../flume/source/shaded/guava/RateLimiter.java  | 435 +++++++++++++++++
 .../source/shaded/guava/SmoothRateLimiter.java  | 389 +++++++++++++++
 .../flume/source/shaded/guava/Stopwatch.java    | 267 +++++++++++
 .../source/shaded/guava/Uninterruptibles.java   | 332 +++++++++++++
 .../apache/flume/source/TestStressSource.java   | 100 ++++
 .../source/shaded/guava/TestRateLimiter.java    | 468 +++++++++++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   1 +
 8 files changed, 2010 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/20338fc6/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
index aa95294..f37174a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.flume.source.shaded.guava.RateLimiter;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
@@ -66,6 +67,7 @@ public class StressSource extends AbstractPollableSource 
implements Configurable
   private Event event;
   private List<Event> eventBatchList;
   private List<Event> eventBatchListToProcess;
+  private RateLimiter limiter;
 
   public StressSource() {
     counterGroup = new CounterGroup();
@@ -89,6 +91,13 @@ public class StressSource extends AbstractPollableSource 
implements Configurable
     /* Size of events to be generated. */
     int size = context.getInteger("size", 500);
 
+    int rateLimit = context.getInteger("maxEventsPerSecond", 0);
+    if (rateLimit > 0) {
+      limiter = RateLimiter.create(rateLimit);
+    } else {
+      limiter = null;
+    }
+
     prepEventData(size);
   }
 
@@ -123,6 +132,9 @@ public class StressSource extends AbstractPollableSource 
implements Configurable
       lastSent = batchSize;
 
       if (batchSize == 1) {
+        if (limiter != null) {
+          limiter.acquire();
+        }
         getChannelProcessor().processEvent(event);
       } else {
         long eventsLeft = maxTotalEvents - totalEventSent;
@@ -133,6 +145,12 @@ public class StressSource extends AbstractPollableSource 
implements Configurable
           eventBatchListToProcess = eventBatchList;
         }
         lastSent = eventBatchListToProcess.size();
+
+        if (limiter != null) {
+          //Cast is safe because eventsLeft must be <= batchSize which is an 
int
+          limiter.acquire((int)lastSent);
+        }
+
         getChannelProcessor().processEventBatch(eventBatchListToProcess);
       }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/20338fc6/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/RateLimiter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/RateLimiter.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/RateLimiter.java
new file mode 100644
index 0000000..c00a106
--- /dev/null
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/RateLimiter.java
@@ -0,0 +1,435 @@
+package org.apache.flume.source.shaded.guava;
+/*
+ * Copyright (C) 2012 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.Math.max;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.flume.source.shaded.guava.Stopwatch;
+import org.apache.flume.source.shaded.guava.SmoothRateLimiter.SmoothBursty;
+import org.apache.flume.source.shaded.guava.SmoothRateLimiter.SmoothWarmingUp;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A rate limiter. Conceptually, a rate limiter distributes permits at a
+ * configurable rate. Each {@link #acquire()} blocks if necessary until a 
permit is
+ * available, and then takes it. Once acquired, permits need not be released.
+ *
+ * <p>Rate limiters are often used to restrict the rate at which some
+ * physical or logical resource is accessed. This is in contrast to {@link
+ * java.util.concurrent.Semaphore} which restricts the number of concurrent
+ * accesses instead of the rate (note though that concurrency and rate are 
closely related,
+ * e.g. see <a href="http://en.wikipedia.org/wiki/Little's_law">Little's 
Law</a>).
+ *
+ * <p>A {@code RateLimiter} is defined primarily by the rate at which permits
+ * are issued. Absent additional configuration, permits will be distributed at 
a
+ * fixed rate, defined in terms of permits per second. Permits will be 
distributed
+ * smoothly, with the delay between individual permits being adjusted to ensure
+ * that the configured rate is maintained.
+ *
+ * <p>It is possible to configure a {@code RateLimiter} to have a warmup
+ * period during which time the permits issued each second steadily increases 
until
+ * it hits the stable rate.
+ *
+ * <p>As an example, imagine that we have a list of tasks to execute, but we 
don't want to
+ * submit more than 2 per second:
+ *<pre>  {@code
+ *  final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 
permits per second"
+ *  void submitTasks(List<Runnable> tasks, Executor executor) {
+ *    for (Runnable task : tasks) {
+ *      rateLimiter.acquire(); // may wait
+ *      executor.execute(task);
+ *    }
+ *  }
+ *}</pre>
+ *
+ * <p>As another example, imagine that we produce a stream of data, and we 
want to cap it
+ * at 5kb per second. This could be accomplished by requiring a permit per 
byte, and specifying
+ * a rate of 5000 permits per second:
+ *<pre>  {@code
+ *  final RateLimiter rateLimiter = RateLimiter.create(5000.0); // rate = 5000 
permits per second
+ *  void submitPacket(byte[] packet) {
+ *    rateLimiter.acquire(packet.length);
+ *    networkService.send(packet);
+ *  }
+ *}</pre>
+ *
+ * <p>It is important to note that the number of permits requested <i>never</i>
+ * affect the throttling of the request itself (an invocation to {@code 
acquire(1)}
+ * and an invocation to {@code acquire(1000)} will result in exactly the same 
throttling, if any),
+ * but it affects the throttling of the <i>next</i> request. I.e., if an 
expensive task
+ * arrives at an idle RateLimiter, it will be granted immediately, but it is 
the <i>next</i>
+ * request that will experience extra throttling, thus paying for the cost of 
the expensive
+ * task.
+ *
+ * <p>Note: {@code RateLimiter} does not provide fairness guarantees.
+ *
+ * @author Dimitris Andreou
+ * @since 13.0
+ */
+// TODO(user): switch to nano precision. A natural unit of cost is "bytes", 
and a micro precision
+//     would mean a maximum rate of "1MB/s", which might be small in some 
cases.
+@ThreadSafe
+@Beta
+public abstract class RateLimiter {
+  /**
+   * Creates a {@code RateLimiter} with the specified stable throughput, given 
as
+   * "permits per second" (commonly referred to as <i>QPS</i>, queries per 
second).
+   *
+   * <p>The returned {@code RateLimiter} ensures that on average no more than 
{@code
+   * permitsPerSecond} are issued during any given second, with sustained 
requests
+   * being smoothly spread over each second. When the incoming request rate 
exceeds
+   * {@code permitsPerSecond} the rate limiter will release one permit every 
{@code
+   * (1.0 / permitsPerSecond)} seconds. When the rate limiter is unused,
+   * bursts of up to {@code permitsPerSecond} permits will be allowed, with 
subsequent
+   * requests being smoothly limited at the stable rate of {@code 
permitsPerSecond}.
+   *
+   * @param permitsPerSecond the rate of the returned {@code RateLimiter}, 
measured in
+   *        how many permits become available per second
+   * @throws IllegalArgumentException if {@code permitsPerSecond} is negative 
or zero
+   */
+  // TODO(user): "This is equivalent to
+  //                 {@code createWithCapacity(permitsPerSecond, 1, 
TimeUnit.SECONDS)}".
+  public static RateLimiter create(double permitsPerSecond) {
+    /*
+     * The default RateLimiter configuration can save the unused permits of up 
to one second.
+     * This is to avoid unnecessary stalls in situations like this: A 
RateLimiter of 1qps,
+     * and 4 threads, all calling acquire() at these moments:
+     *
+     * T0 at 0 seconds
+     * T1 at 1.05 seconds
+     * T2 at 2 seconds
+     * T3 at 3 seconds
+     *
+     * Due to the slight delay of T1, T2 would have to sleep till 2.05 seconds,
+     * and T3 would also have to sleep till 3.05 seconds.
+     */
+    return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
+  }
+
+  /*
+   * TODO(cpovirk): make SleepingStopwatch the last parameter throughout the 
class so that the
+   * overloads follow the usual convention: Foo(int), Foo(int, 
SleepingStopwatch)
+   */
+  @VisibleForTesting
+  static RateLimiter create(SleepingStopwatch stopwatch, double 
permitsPerSecond) {
+    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* 
maxBurstSeconds */);
+    rateLimiter.setRate(permitsPerSecond);
+    return rateLimiter;
+  }
+
+  /**
+   * Creates a {@code RateLimiter} with the specified stable throughput, given 
as
+   * "permits per second" (commonly referred to as <i>QPS</i>, queries per 
second), and a
+   * <i>warmup period</i>, during which the {@code RateLimiter} smoothly ramps 
up its rate,
+   * until it reaches its maximum rate at the end of the period (as long as 
there are enough
+   * requests to saturate it). Similarly, if the {@code RateLimiter} is left 
<i>unused</i> for
+   * a duration of {@code warmupPeriod}, it will gradually return to its 
"cold" state,
+   * i.e. it will go through the same warming up process as when it was first 
created.
+   *
+   * <p>The returned {@code RateLimiter} is intended for cases where the 
resource that actually
+   * fulfills the requests (e.g., a remote server) needs "warmup" time, rather 
than
+   * being immediately accessed at the stable (maximum) rate.
+   *
+   * <p>The returned {@code RateLimiter} starts in a "cold" state (i.e. the 
warmup period
+   * will follow), and if it is left unused for long enough, it will return to 
that state.
+   *
+   * @param permitsPerSecond the rate of the returned {@code RateLimiter}, 
measured in
+   *        how many permits become available per second
+   * @param warmupPeriod the duration of the period where the {@code 
RateLimiter} ramps up its
+   *        rate, before reaching its stable (maximum) rate
+   * @param unit the time unit of the warmupPeriod argument
+   * @throws IllegalArgumentException if {@code permitsPerSecond} is negative 
or zero or
+   *     {@code warmupPeriod} is negative
+   */
+  public static RateLimiter create(double permitsPerSecond, long warmupPeriod, 
TimeUnit unit) {
+    checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", 
warmupPeriod);
+    return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond, 
warmupPeriod, unit);
+  }
+
+  @VisibleForTesting
+  static RateLimiter create(
+      SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, 
TimeUnit unit) {
+    RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, 
unit);
+    rateLimiter.setRate(permitsPerSecond);
+    return rateLimiter;
+  }
+
+  /**
+   * The underlying timer; used both to measure elapsed time and sleep as 
necessary. A separate
+   * object to facilitate testing.
+   */
+  private final SleepingStopwatch stopwatch;
+
+  // Can't be initialized in the constructor because mocks don't call the 
constructor.
+  private volatile Object mutexDoNotUseDirectly;
+
+  private Object mutex() {
+    Object mutex = mutexDoNotUseDirectly;
+    if (mutex == null) {
+      synchronized (this) {
+        mutex = mutexDoNotUseDirectly;
+        if (mutex == null) {
+          mutexDoNotUseDirectly = mutex = new Object();
+        }
+      }
+    }
+    return mutex;
+  }
+
+  RateLimiter(SleepingStopwatch stopwatch) {
+    this.stopwatch = checkNotNull(stopwatch);
+  }
+
+  /**
+   * Updates the stable rate of this {@code RateLimiter}, that is, the
+   * {@code permitsPerSecond} argument provided in the factory method that
+   * constructed the {@code RateLimiter}. Currently throttled threads will 
<b>not</b>
+   * be awakened as a result of this invocation, thus they do not observe the 
new rate;
+   * only subsequent requests will.
+   *
+   * <p>Note though that, since each request repays (by waiting, if necessary) 
the cost
+   * of the <i>previous</i> request, this means that the very next request
+   * after an invocation to {@code setRate} will not be affected by the new 
rate;
+   * it will pay the cost of the previous request, which is in terms of the 
previous rate.
+   *
+   * <p>The behavior of the {@code RateLimiter} is not modified in any other 
way,
+   * e.g. if the {@code RateLimiter} was configured with a warmup period of 20 
seconds,
+   * it still has a warmup period of 20 seconds after this method invocation.
+   *
+   * @param permitsPerSecond the new stable rate of this {@code RateLimiter}
+   * @throws IllegalArgumentException if {@code permitsPerSecond} is negative 
or zero
+   */
+  public final void setRate(double permitsPerSecond) {
+    checkArgument(
+        permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must 
be positive");
+    synchronized (mutex()) {
+      doSetRate(permitsPerSecond, stopwatch.readMicros());
+    }
+  }
+
+  abstract void doSetRate(double permitsPerSecond, long nowMicros);
+
+  /**
+   * Returns the stable rate (as {@code permits per seconds}) with which this
+   * {@code RateLimiter} is configured with. The initial value of this is the 
same as
+   * the {@code permitsPerSecond} argument passed in the factory method that 
produced
+   * this {@code RateLimiter}, and it is only updated after invocations
+   * to {@linkplain #setRate}.
+   */
+  public final double getRate() {
+    synchronized (mutex()) {
+      return doGetRate();
+    }
+  }
+
+  abstract double doGetRate();
+
+  /**
+   * Acquires a single permit from this {@code RateLimiter}, blocking until the
+   * request can be granted. Tells the amount of time slept, if any.
+   *
+   * <p>This method is equivalent to {@code acquire(1)}.
+   *
+   * @return time spent sleeping to enforce rate, in seconds; 0.0 if not 
rate-limited
+   * @since 16.0 (present in 13.0 with {@code void} return type})
+   */
+  public double acquire() {
+    return acquire(1);
+  }
+
+  /**
+   * Acquires the given number of permits from this {@code RateLimiter}, 
blocking until the
+   * request can be granted. Tells the amount of time slept, if any.
+   *
+   * @param permits the number of permits to acquire
+   * @return time spent sleeping to enforce rate, in seconds; 0.0 if not 
rate-limited
+   * @throws IllegalArgumentException if the requested number of permits is 
negative or zero
+   * @since 16.0 (present in 13.0 with {@code void} return type})
+   */
+  public double acquire(int permits) {
+    long microsToWait = reserve(permits);
+    stopwatch.sleepMicrosUninterruptibly(microsToWait);
+    return 1.0 * microsToWait / SECONDS.toMicros(1L);
+  }
+
+  /**
+   * Reserves the given number of permits from this {@code RateLimiter} for 
future use, returning
+   * the number of microseconds until the reservation can be consumed.
+   *
+   * @return time in microseconds to wait until the resource can be acquired, 
never negative
+   */
+  final long reserve(int permits) {
+    checkPermits(permits);
+    synchronized (mutex()) {
+      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
+    }
+  }
+
+  /**
+   * Acquires a permit from this {@code RateLimiter} if it can be obtained
+   * without exceeding the specified {@code timeout}, or returns {@code false}
+   * immediately (without waiting) if the permit would not have been granted
+   * before the timeout expired.
+   *
+   * <p>This method is equivalent to {@code tryAcquire(1, timeout, unit)}.
+   *
+   * @param timeout the maximum time to wait for the permit. Negative values 
are treated as zero.
+   * @param unit the time unit of the timeout argument
+   * @return {@code true} if the permit was acquired, {@code false} otherwise
+   * @throws IllegalArgumentException if the requested number of permits is 
negative or zero
+   */
+  public boolean tryAcquire(long timeout, TimeUnit unit) {
+    return tryAcquire(1, timeout, unit);
+  }
+
+  /**
+   * Acquires permits from this {@link RateLimiter} if it can be acquired 
immediately without delay.
+   *
+   * <p>
+   * This method is equivalent to {@code tryAcquire(permits, 0, anyUnit)}.
+   *
+   * @param permits the number of permits to acquire
+   * @return {@code true} if the permits were acquired, {@code false} otherwise
+   * @throws IllegalArgumentException if the requested number of permits is 
negative or zero
+   * @since 14.0
+   */
+  public boolean tryAcquire(int permits) {
+    return tryAcquire(permits, 0, MICROSECONDS);
+  }
+
+  /**
+   * Acquires a permit from this {@link RateLimiter} if it can be acquired 
immediately without
+   * delay.
+   *
+   * <p>
+   * This method is equivalent to {@code tryAcquire(1)}.
+   *
+   * @return {@code true} if the permit was acquired, {@code false} otherwise
+   * @since 14.0
+   */
+  public boolean tryAcquire() {
+    return tryAcquire(1, 0, MICROSECONDS);
+  }
+
+  /**
+   * Acquires the given number of permits from this {@code RateLimiter} if it 
can be obtained
+   * without exceeding the specified {@code timeout}, or returns {@code false}
+   * immediately (without waiting) if the permits would not have been granted
+   * before the timeout expired.
+   *
+   * @param permits the number of permits to acquire
+   * @param timeout the maximum time to wait for the permits. Negative values 
are treated as zero.
+   * @param unit the time unit of the timeout argument
+   * @return {@code true} if the permits were acquired, {@code false} otherwise
+   * @throws IllegalArgumentException if the requested number of permits is 
negative or zero
+   */
+  public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
+    long timeoutMicros = max(unit.toMicros(timeout), 0);
+    checkPermits(permits);
+    long microsToWait;
+    synchronized (mutex()) {
+      long nowMicros = stopwatch.readMicros();
+      if (!canAcquire(nowMicros, timeoutMicros)) {
+        return false;
+      } else {
+        microsToWait = reserveAndGetWaitLength(permits, nowMicros);
+      }
+    }
+    stopwatch.sleepMicrosUninterruptibly(microsToWait);
+    return true;
+  }
+
+  private boolean canAcquire(long nowMicros, long timeoutMicros) {
+    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
+  }
+
+  /**
+   * Reserves next ticket and returns the wait time that the caller must wait 
for.
+   *
+   * @return the required wait time, never negative
+   */
+  final long reserveAndGetWaitLength(int permits, long nowMicros) {
+    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
+    return max(momentAvailable - nowMicros, 0);
+  }
+
+  /**
+   * Returns the earliest time that permits are available (with one caveat).
+   *
+   * @return the time that permits are available, or, if permits are available 
immediately, an
+   *     arbitrary past or present time
+   */
+  abstract long queryEarliestAvailable(long nowMicros);
+
+  /**
+   * Reserves the requested number of permits and returns the time that those 
permits can be used
+   * (with one caveat).
+     *
+   * @return the time that the permits may be used, or, if the permits may be 
used immediately, an
+   *     arbitrary past or present time
+     */
+  abstract long reserveEarliestAvailable(int permits, long nowMicros);
+
+  @Override
+  public String toString() {
+    return String.format("RateLimiter[stableRate=%3.1fqps]", getRate());
+  }
+
+  @VisibleForTesting
+  abstract static class SleepingStopwatch {
+    /*
+     * We always hold the mutex when calling this. TODO(cpovirk): Is that 
important? Perhaps we need
+     * to guarantee that each call to reserveEarliestAvailable, etc. sees a 
value >= the previous?
+     * Also, is it OK that we don't hold the mutex when sleeping?
+     */
+    abstract long readMicros();
+
+    abstract void sleepMicrosUninterruptibly(long micros);
+
+    static final SleepingStopwatch createFromSystemTimer() {
+      return new SleepingStopwatch() {
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+
+        @Override
+        long readMicros() {
+          return stopwatch.elapsed(MICROSECONDS);
+        }
+
+        @Override
+        void sleepMicrosUninterruptibly(long micros) {
+          if (micros > 0) {
+            Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS);
+          }
+        }
+      };
+    }
+  }
+
+  private static int checkPermits(int permits) {
+    checkArgument(permits > 0, "Requested permits (%s) must be positive", 
permits);
+    return permits;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/20338fc6/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/SmoothRateLimiter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/SmoothRateLimiter.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/SmoothRateLimiter.java
new file mode 100644
index 0000000..5f1ab5f
--- /dev/null
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/SmoothRateLimiter.java
@@ -0,0 +1,389 @@
+package org.apache.flume.source.shaded.guava;
+
+/*
+ * Copyright (C) 2012 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.lang.Math.min;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.concurrent.TimeUnit;
+
+abstract class SmoothRateLimiter extends RateLimiter {
+  /*
+   * How is the RateLimiter designed, and why?
+   *
+   * The primary feature of a RateLimiter is its "stable rate", the maximum 
rate that
+   * is should allow at normal conditions. This is enforced by "throttling" 
incoming
+   * requests as needed, i.e. compute, for an incoming request, the 
appropriate throttle time,
+   * and make the calling thread wait as much.
+   *
+   * The simplest way to maintain a rate of QPS is to keep the timestamp of 
the last
+   * granted request, and ensure that (1/QPS) seconds have elapsed since then. 
For example,
+   * for a rate of QPS=5 (5 tokens per second), if we ensure that a request 
isn't granted
+   * earlier than 200ms after the last one, then we achieve the intended rate.
+   * If a request comes and the last request was granted only 100ms ago, then 
we wait for
+   * another 100ms. At this rate, serving 15 fresh permits (i.e. for an 
acquire(15) request)
+   * naturally takes 3 seconds.
+   *
+   * It is important to realize that such a RateLimiter has a very superficial 
memory
+   * of the past: it only remembers the last request. What if the RateLimiter 
was unused for
+   * a long period of time, then a request arrived and was immediately granted?
+   * This RateLimiter would immediately forget about that past 
underutilization. This may
+   * result in either underutilization or overflow, depending on the real 
world consequences
+   * of not using the expected rate.
+   *
+   * Past underutilization could mean that excess resources are available. 
Then, the RateLimiter
+   * should speed up for a while, to take advantage of these resources. This 
is important
+   * when the rate is applied to networking (limiting bandwidth), where past 
underutilization
+   * typically translates to "almost empty buffers", which can be filled 
immediately.
+   *
+   * On the other hand, past underutilization could mean that "the server 
responsible for
+   * handling the request has become less ready for future requests", i.e. its 
caches become
+   * stale, and requests become more likely to trigger expensive operations (a 
more extreme
+   * case of this example is when a server has just booted, and it is mostly 
busy with getting
+   * itself up to speed).
+   *
+   * To deal with such scenarios, we add an extra dimension, that of "past 
underutilization",
+   * modeled by "storedPermits" variable. This variable is zero when there is 
no
+   * underutilization, and it can grow up to maxStoredPermits, for 
sufficiently large
+   * underutilization. So, the requested permits, by an invocation 
acquire(permits),
+   * are served from:
+   * - stored permits (if available)
+   * - fresh permits (for any remaining permits)
+   *
+   * How this works is best explained with an example:
+   *
+   * For a RateLimiter that produces 1 token per second, every second
+   * that goes by with the RateLimiter being unused, we increase storedPermits 
by 1.
+   * Say we leave the RateLimiter unused for 10 seconds (i.e., we expected a 
request at time
+   * X, but we are at time X + 10 seconds before a request actually arrives; 
this is
+   * also related to the point made in the last paragraph), thus storedPermits
+   * becomes 10.0 (assuming maxStoredPermits >= 10.0). At that point, a 
request of acquire(3)
+   * arrives. We serve this request out of storedPermits, and reduce that to 
7.0 (how this is
+   * translated to throttling time is discussed later). Immediately after, 
assume that an
+   * acquire(10) request arriving. We serve the request partly from 
storedPermits,
+   * using all the remaining 7.0 permits, and the remaining 3.0, we serve them 
by fresh permits
+   * produced by the rate limiter.
+   *
+   * We already know how much time it takes to serve 3 fresh permits: if the 
rate is
+   * "1 token per second", then this will take 3 seconds. But what does it 
mean to serve 7
+   * stored permits? As explained above, there is no unique answer. If we are 
primarily
+   * interested to deal with underutilization, then we want stored permits to 
be given out
+   * /faster/ than fresh ones, because underutilization = free resources for 
the taking.
+   * If we are primarily interested to deal with overflow, then stored permits 
could
+   * be given out /slower/ than fresh ones. Thus, we require a (different in 
each case)
+   * function that translates storedPermits to throtting time.
+   *
+   * This role is played by storedPermitsToWaitTime(double storedPermits, 
double permitsToTake).
+   * The underlying model is a continuous function mapping storedPermits
+   * (from 0.0 to maxStoredPermits) onto the 1/rate (i.e. intervals) that is 
effective at the given
+   * storedPermits. "storedPermits" essentially measure unused time; we spend 
unused time
+   * buying/storing permits. Rate is "permits / time", thus "1 / rate = time / 
permits".
+   * Thus, "1/rate" (time / permits) times "permits" gives time, i.e., 
integrals on this
+   * function (which is what storedPermitsToWaitTime() computes) correspond to 
minimum intervals
+   * between subsequent requests, for the specified number of requested 
permits.
+   *
+   * Here is an example of storedPermitsToWaitTime:
+   * If storedPermits == 10.0, and we want 3 permits, we take them from 
storedPermits,
+   * reducing them to 7.0, and compute the throttling for these as a call to
+   * storedPermitsToWaitTime(storedPermits = 10.0, permitsToTake = 3.0), which 
will
+   * evaluate the integral of the function from 7.0 to 10.0.
+   *
+   * Using integrals guarantees that the effect of a single acquire(3) is 
equivalent
+   * to { acquire(1); acquire(1); acquire(1); }, or { acquire(2); acquire(1); 
}, etc,
+   * since the integral of the function in [7.0, 10.0] is equivalent to the 
sum of the
+   * integrals of [7.0, 8.0], [8.0, 9.0], [9.0, 10.0] (and so on), no matter
+   * what the function is. This guarantees that we handle correctly requests 
of varying weight
+   * (permits), /no matter/ what the actual function is - so we can tweak the 
latter freely.
+   * (The only requirement, obviously, is that we can compute its integrals).
+   *
+   * Note well that if, for this function, we chose a horizontal line, at 
height of exactly
+   * (1/QPS), then the effect of the function is non-existent: we serve 
storedPermits at
+   * exactly the same cost as fresh ones (1/QPS is the cost for each). We use 
this trick later.
+   *
+   * If we pick a function that goes /below/ that horizontal line, it means 
that we reduce
+   * the area of the function, thus time. Thus, the RateLimiter becomes 
/faster/ after a
+   * period of underutilization. If, on the other hand, we pick a function that
+   * goes /above/ that horizontal line, then it means that the area (time) is 
increased,
+   * thus storedPermits are more costly than fresh permits, thus the 
RateLimiter becomes
+   * /slower/ after a period of underutilization.
+   *
+   * Last, but not least: consider a RateLimiter with rate of 1 permit per 
second, currently
+   * completely unused, and an expensive acquire(100) request comes. It would 
be nonsensical
+   * to just wait for 100 seconds, and /then/ start the actual task. Why wait 
without doing
+   * anything? A much better approach is to /allow/ the request right away (as 
if it was an
+   * acquire(1) request instead), and postpone /subsequent/ requests as 
needed. In this version,
+   * we allow starting the task immediately, and postpone by 100 seconds 
future requests,
+   * thus we allow for work to get done in the meantime instead of waiting 
idly.
+   *
+   * This has important consequences: it means that the RateLimiter doesn't 
remember the time
+   * of the _last_ request, but it remembers the (expected) time of the _next_ 
request. This
+   * also enables us to tell immediately (see tryAcquire(timeout)) whether a 
particular
+   * timeout is enough to get us to the point of the next scheduling time, 
since we always
+   * maintain that. And what we mean by "an unused RateLimiter" is also 
defined by that
+   * notion: when we observe that the "expected arrival time of the next 
request" is actually
+   * in the past, then the difference (now - past) is the amount of time that 
the RateLimiter
+   * was formally unused, and it is that amount of time which we translate to 
storedPermits.
+   * (We increase storedPermits with the amount of permits that would have 
been produced
+   * in that idle time). So, if rate == 1 permit per second, and arrivals come 
exactly
+   * one second after the previous, then storedPermits is _never_ increased -- 
we would only
+   * increase it for arrivals _later_ than the expected one second.
+   */
+
+  /**
+   * This implements the following function:
+   *
+   *          ^ throttling
+   *          |
+   * 3*stable +                  /
+   * interval |                 /.
+   *  (cold)  |                / .
+   *          |               /  .   <-- "warmup period" is the area of the 
trapezoid between
+   * 2*stable +              /   .       halfPermits and maxPermits
+   * interval |             /    .
+   *          |            /     .
+   *          |           /      .
+   *   stable +----------/  WARM . }
+   * interval |          .   UP  . } <-- this rectangle (from 0 to maxPermits, 
and
+   *          |          . PERIOD. }     height == stableInterval) defines the 
cooldown period,
+   *          |          .       . }     and we want cooldownPeriod == 
warmupPeriod
+   *          |---------------------------------> storedPermits
+   *              (halfPermits) (maxPermits)
+   *
+   * Before going into the details of this particular function, let's keep in 
mind the basics:
+   * 1) The state of the RateLimiter (storedPermits) is a vertical line in 
this figure.
+   * 2) When the RateLimiter is not used, this goes right (up to maxPermits)
+   * 3) When the RateLimiter is used, this goes left (down to zero), since if 
we have storedPermits,
+   *    we serve from those first
+   * 4) When _unused_, we go right at the same speed (rate)! I.e., if our rate 
is
+   *    2 permits per second, and 3 unused seconds pass, we will always save 6 
permits
+   *    (no matter what our initial position was), up to maxPermits.
+   *    If we invert the rate, we get the "stableInterval" (interval between 
two requests
+   *    in a perfectly spaced out sequence of requests of the given rate). 
Thus, if you
+   *    want to see "how much time it will take to go from X storedPermits to 
X+K storedPermits?",
+   *    the answer is always stableInterval * K. In the same example, for 2 
permits per second,
+   *    stableInterval is 500ms. Thus to go from X storedPermits to X+6 
storedPermits, we
+   *    require 6 * 500ms = 3 seconds.
+   *
+   *    In short, the time it takes to move to the right (save K permits) is 
equal to the
+   *    rectangle of width == K and height == stableInterval.
+   * 4) When _used_, the time it takes, as explained in the introductory class 
note, is
+   *    equal to the integral of our function, between X permits and X-K 
permits, assuming
+   *    we want to spend K saved permits.
+   *
+   *    In summary, the time it takes to move to the left (spend K permits), 
is equal to the
+   *    area of the function of width == K.
+   *
+   * Let's dive into this function now:
+   *
+   * When we have storedPermits <= halfPermits (the left portion of the 
function), then
+   * we spend them at the exact same rate that
+   * fresh permits would be generated anyway (that rate is 1/stableInterval). 
We size
+   * this area to be equal to _half_ the specified warmup period. Why we need 
this?
+   * And why half? We'll explain shortly below (after explaining the second 
part).
+   *
+   * Stored permits that are beyond halfPermits, are mapped to an ascending 
line, that goes
+   * from stableInterval to 3 * stableInterval. The average height for that 
part is
+   * 2 * stableInterval, and is sized appropriately to have an area _equal_ to 
the
+   * specified warmup period. Thus, by point (4) above, it takes 
"warmupPeriod" amount of time
+   * to go from maxPermits to halfPermits.
+   *
+   * BUT, by point (3) above, it only takes "warmupPeriod / 2" amount of time 
to return back
+   * to maxPermits, from halfPermits! (Because the trapezoid has double the 
area of the rectangle
+   * of height stableInterval and equivalent width). We decided that the 
"cooldown period"
+   * time should be equivalent to "warmup period", thus a fully saturated 
RateLimiter
+   * (with zero stored permits, serving only fresh ones) can go to a fully 
unsaturated
+   * (with storedPermits == maxPermits) in the same amount of time it takes 
for a fully
+   * unsaturated RateLimiter to return to the stableInterval -- which happens 
in halfPermits,
+   * since beyond that point, we use a horizontal line of "stableInterval" 
height, simulating
+   * the regular rate.
+   *
+   * Thus, we have figured all dimensions of this shape, to give all the 
desired
+   * properties:
+   * - the width is warmupPeriod / stableInterval, to make cooldownPeriod == 
warmupPeriod
+   * - the slope starts at the middle, and goes from stableInterval to 
3*stableInterval so
+   *   to have halfPermits being spend in double the usual time (half the 
rate), while their
+   *   respective rate is steadily ramping up
+   */
+  static final class SmoothWarmingUp extends SmoothRateLimiter {
+    private final long warmupPeriodMicros;
+    /**
+     * The slope of the line from the stable interval (when permits == 0), to 
the cold interval
+     * (when permits == maxPermits)
+     */
+    private double slope;
+    private double halfPermits;
+  
+    SmoothWarmingUp(SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit 
timeUnit) {
+      super(stopwatch);
+      this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
+    }
+  
+    @Override
+    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
+      double oldMaxPermits = maxPermits;
+      maxPermits = warmupPeriodMicros / stableIntervalMicros;
+      halfPermits = maxPermits / 2.0;
+      // Stable interval is x, cold is 3x, so on average it's 2x. Double the 
time -> halve the rate
+      double coldIntervalMicros = stableIntervalMicros * 3.0;
+      slope = (coldIntervalMicros - stableIntervalMicros) / halfPermits;
+      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
+        // if we don't special-case this, we would get storedPermits == NaN, 
below
+        storedPermits = 0.0;
+      } else {
+        storedPermits = (oldMaxPermits == 0.0)
+            ? maxPermits // initial state is cold
+            : storedPermits * maxPermits / oldMaxPermits;
+      }
+    }
+  
+    @Override
+    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
+      double availablePermitsAboveHalf = storedPermits - halfPermits;
+      long micros = 0;
+      // measuring the integral on the right part of the function (the 
climbing line)
+      if (availablePermitsAboveHalf > 0.0) {
+        double permitsAboveHalfToTake = min(availablePermitsAboveHalf, 
permitsToTake);
+        micros = (long) (permitsAboveHalfToTake * 
(permitsToTime(availablePermitsAboveHalf)
+            + permitsToTime(availablePermitsAboveHalf - 
permitsAboveHalfToTake)) / 2.0);
+        permitsToTake -= permitsAboveHalfToTake;
+      }
+      // measuring the integral on the left part of the function (the 
horizontal line)
+      micros += (stableIntervalMicros * permitsToTake);
+      return micros;
+    }
+  
+    private double permitsToTime(double permits) {
+      return stableIntervalMicros + permits * slope;
+    }
+  }
+
+  /**
+   * This implements a "bursty" RateLimiter, where storedPermits are 
translated to
+   * zero throttling. The maximum number of permits that can be saved (when 
the RateLimiter is
+   * unused) is defined in terms of time, in this sense: if a RateLimiter is 
2qps, and this
+   * time is specified as 10 seconds, we can save up to 2 * 10 = 20 permits. 
+   */
+  static final class SmoothBursty extends SmoothRateLimiter {
+    /** The work (permits) of how many seconds can be saved up if this 
RateLimiter is unused? */
+    final double maxBurstSeconds; 
+    
+    SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
+      super(stopwatch);
+      this.maxBurstSeconds = maxBurstSeconds;
+    }
+  
+    @Override
+    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
+      double oldMaxPermits = this.maxPermits;
+      maxPermits = maxBurstSeconds * permitsPerSecond;
+      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
+        // if we don't special-case this, we would get storedPermits == NaN, 
below
+        storedPermits = maxPermits;
+      } else {
+        storedPermits = (oldMaxPermits == 0.0)
+            ? 0.0 // initial state
+            : storedPermits * maxPermits / oldMaxPermits;
+      }
+    }
+  
+    @Override
+    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
+      return 0L;
+    }
+  }
+
+  /**
+   * The currently stored permits.
+   */
+  double storedPermits;
+
+  /**
+   * The maximum number of stored permits.
+   */
+  double maxPermits;
+
+  /**
+   * The interval between two unit requests, at our stable rate. E.g., a 
stable rate of 5 permits
+   * per second has a stable interval of 200ms.
+   */
+  double stableIntervalMicros;
+
+  /**
+   * The time when the next request (no matter its size) will be granted. 
After granting a
+   * request, this is pushed further in the future. Large requests push this 
further than small
+   * requests.
+   */
+  private long nextFreeTicketMicros = 0L; // could be either in the past or 
future
+
+  private SmoothRateLimiter(SleepingStopwatch stopwatch) {
+    super(stopwatch);
+  }
+
+  @Override
+  final void doSetRate(double permitsPerSecond, long nowMicros) {
+    resync(nowMicros);
+    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
+    this.stableIntervalMicros = stableIntervalMicros;
+    doSetRate(permitsPerSecond, stableIntervalMicros);
+  }
+
+  abstract void doSetRate(double permitsPerSecond, double 
stableIntervalMicros);
+
+  @Override
+  final double doGetRate() {
+    return SECONDS.toMicros(1L) / stableIntervalMicros;
+  }
+
+  @Override
+  final long queryEarliestAvailable(long nowMicros) {
+    return nextFreeTicketMicros;
+  }
+
+  @Override
+  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
+    resync(nowMicros);
+    long returnValue = nextFreeTicketMicros;
+    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
+    double freshPermits = requiredPermits - storedPermitsToSpend;
+
+    long waitMicros = storedPermitsToWaitTime(this.storedPermits, 
storedPermitsToSpend)
+        + (long) (freshPermits * stableIntervalMicros);
+
+    this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
+    this.storedPermits -= storedPermitsToSpend;
+    return returnValue;
+  }
+
+  /**
+   * Translates a specified portion of our currently stored permits which we 
want to
+   * spend/acquire, into a throttling time. Conceptually, this evaluates the 
integral
+   * of the underlying function we use, for the range of
+   * [(storedPermits - permitsToTake), storedPermits].
+   *
+   * <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
+   */
+  abstract long storedPermitsToWaitTime(double storedPermits, double 
permitsToTake);
+
+  private void resync(long nowMicros) {
+    // if nextFreeTicket is in the past, resync to now
+    if (nowMicros > nextFreeTicketMicros) {
+      storedPermits = min(maxPermits,
+          storedPermits + (nowMicros - nextFreeTicketMicros) / 
stableIntervalMicros);
+      nextFreeTicketMicros = nowMicros;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/20338fc6/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/Stopwatch.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/Stopwatch.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/Stopwatch.java
new file mode 100644
index 0000000..277bf15
--- /dev/null
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/Stopwatch.java
@@ -0,0 +1,267 @@
+package org.apache.flume.source.shaded.guava;
+/*
+ * Copyright (C) 2008 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.concurrent.TimeUnit.DAYS;
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.GwtCompatible;
+import com.google.common.annotations.GwtIncompatible;
+import com.google.common.base.Ticker;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An object that measures elapsed time in nanoseconds. It is useful to measure
+ * elapsed time using this class instead of direct calls to {@link
+ * System#nanoTime} for a few reasons:
+ *
+ * <ul>
+ * <li>An alternate time source can be substituted, for testing or performance
+ *     reasons.
+ * <li>As documented by {@code nanoTime}, the value returned has no absolute
+ *     meaning, and can only be interpreted as relative to another timestamp
+ *     returned by {@code nanoTime} at a different time. {@code Stopwatch} is a
+ *     more effective abstraction because it exposes only these relative 
values,
+ *     not the absolute ones.
+ * </ul>
+ *
+ * <p>Basic usage:
+ * <pre>
+ *   Stopwatch stopwatch = Stopwatch.{@link #createStarted createStarted}();
+ *   doSomething();
+ *   stopwatch.{@link #stop stop}(); // optional
+ *
+ *   long millis = stopwatch.elapsed(MILLISECONDS);
+ *
+ *   log.info("time: " + stopwatch); // formatted string like "12.3 ms"</pre>
+ *
+ * <p>Stopwatch methods are not idempotent; it is an error to start or stop a
+ * stopwatch that is already in the desired state.
+ *
+ * <p>When testing code that uses this class, use
+ * {@link #createUnstarted(Ticker)} or {@link #createStarted(Ticker)} to
+ * supply a fake or mock ticker.
+ * <!-- TODO(kevinb): restore the "such as" --> This allows you to
+ * simulate any valid behavior of the stopwatch.
+ *
+ * <p><b>Note:</b> This class is not thread-safe.
+ *
+ * @author Kevin Bourrillion
+ * @since 10.0
+ */
+@Beta
+@GwtCompatible(emulated = true)
+public final class Stopwatch {
+  private final Ticker ticker;
+  private boolean isRunning;
+  private long elapsedNanos;
+  private long startTick;
+
+  /**
+   * Creates (but does not start) a new stopwatch using {@link System#nanoTime}
+   * as its time source.
+   *
+   * @since 15.0
+   */
+  public static Stopwatch createUnstarted() {
+    return new Stopwatch();
+  }
+
+  /**
+   * Creates (but does not start) a new stopwatch, using the specified time
+   * source.
+   *
+   * @since 15.0
+   */
+  public static Stopwatch createUnstarted(Ticker ticker) {
+    return new Stopwatch(ticker);
+  }
+
+  /**
+   * Creates (and starts) a new stopwatch using {@link System#nanoTime}
+   * as its time source.
+   *
+   * @since 15.0
+   */
+  public static Stopwatch createStarted() {
+    return new Stopwatch().start();
+  }
+
+  /**
+   * Creates (and starts) a new stopwatch, using the specified time
+   * source.
+   *
+   * @since 15.0
+   */
+  public static Stopwatch createStarted(Ticker ticker) {
+    return new Stopwatch(ticker).start();
+  }
+
+  /**
+   * Creates (but does not start) a new stopwatch using {@link System#nanoTime}
+   * as its time source.
+   *
+   * @deprecated Use {@link Stopwatch#createUnstarted()} instead.
+   */
+  @Deprecated
+  Stopwatch() {
+    this(Ticker.systemTicker());
+  }
+
+  /**
+   * Creates (but does not start) a new stopwatch, using the specified time
+   * source.
+   *
+   * @deprecated Use {@link Stopwatch#createUnstarted(Ticker)} instead.
+   */
+  @Deprecated
+  Stopwatch(Ticker ticker) {
+    this.ticker = checkNotNull(ticker, "ticker");
+  }
+
+  /**
+   * Returns {@code true} if {@link #start()} has been called on this 
stopwatch,
+   * and {@link #stop()} has not been called since the last call to {@code
+   * start()}.
+   */
+  public boolean isRunning() {
+    return isRunning;
+  }
+
+  /**
+   * Starts the stopwatch.
+   *
+   * @return this {@code Stopwatch} instance
+   * @throws IllegalStateException if the stopwatch is already running.
+   */
+  public Stopwatch start() {
+    checkState(!isRunning, "This stopwatch is already running.");
+    isRunning = true;
+    startTick = ticker.read();
+    return this;
+  }
+
+  /**
+   * Stops the stopwatch. Future reads will return the fixed duration that had
+   * elapsed up to this point.
+   *
+   * @return this {@code Stopwatch} instance
+   * @throws IllegalStateException if the stopwatch is already stopped.
+   */
+  public Stopwatch stop() {
+    long tick = ticker.read();
+    checkState(isRunning, "This stopwatch is already stopped.");
+    isRunning = false;
+    elapsedNanos += tick - startTick;
+    return this;
+  }
+
+  /**
+   * Sets the elapsed time for this stopwatch to zero,
+   * and places it in a stopped state.
+   *
+   * @return this {@code Stopwatch} instance
+   */
+  public Stopwatch reset() {
+    elapsedNanos = 0;
+    isRunning = false;
+    return this;
+  }
+
+  private long elapsedNanos() {
+    return isRunning ? ticker.read() - startTick + elapsedNanos : elapsedNanos;
+  }
+
+  /**
+   * Returns the current elapsed time shown on this stopwatch, expressed
+   * in the desired time unit, with any fraction rounded down.
+   *
+   * <p>Note that the overhead of measurement can be more than a microsecond, 
so
+   * it is generally not useful to specify {@link TimeUnit#NANOSECONDS}
+   * precision here.
+   *
+   * @since 14.0 (since 10.0 as {@code elapsedTime()})
+   */
+  public long elapsed(TimeUnit desiredUnit) {
+    return desiredUnit.convert(elapsedNanos(), NANOSECONDS);
+  }
+
+  /**
+   * Returns a string representation of the current elapsed time.
+   */
+  @GwtIncompatible("String.format()")
+  @Override public String toString() {
+    long nanos = elapsedNanos();
+
+    TimeUnit unit = chooseUnit(nanos);
+    double value = (double) nanos / NANOSECONDS.convert(1, unit);
+
+    // Too bad this functionality is not exposed as a regular method call
+    return String.format("%.4g %s", value, abbreviate(unit));
+  }
+
+  private static TimeUnit chooseUnit(long nanos) {
+    if (DAYS.convert(nanos, NANOSECONDS) > 0) {
+      return DAYS;
+    }
+    if (HOURS.convert(nanos, NANOSECONDS) > 0) {
+      return HOURS;
+    }
+    if (MINUTES.convert(nanos, NANOSECONDS) > 0) {
+      return MINUTES;
+    }
+    if (SECONDS.convert(nanos, NANOSECONDS) > 0) {
+      return SECONDS;
+    }
+    if (MILLISECONDS.convert(nanos, NANOSECONDS) > 0) {
+      return MILLISECONDS;
+    }
+    if (MICROSECONDS.convert(nanos, NANOSECONDS) > 0) {
+      return MICROSECONDS;
+    }
+    return NANOSECONDS;
+  }
+
+  private static String abbreviate(TimeUnit unit) {
+    switch (unit) {
+      case NANOSECONDS:
+        return "ns";
+      case MICROSECONDS:
+        return "\u03bcs"; // μs
+      case MILLISECONDS:
+        return "ms";
+      case SECONDS:
+        return "s";
+      case MINUTES:
+        return "min";
+      case HOURS:
+        return "h";
+      case DAYS:
+        return "d";
+      default:
+        throw new AssertionError();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/20338fc6/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/Uninterruptibles.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/Uninterruptibles.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/Uninterruptibles.java
new file mode 100644
index 0000000..fe5df25
--- /dev/null
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/shaded/guava/Uninterruptibles.java
@@ -0,0 +1,332 @@
+package org.apache.flume.source.shaded.guava;
+
+/*
+ * Copyright (C) 2011 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Utilities for treating interruptible operations as uninterruptible.
+ * In all cases, if a thread is interrupted during such a call, the call
+ * continues to block until the result is available or the timeout elapses,
+ * and only then re-interrupts the thread.
+ *
+ * @author Anthony Zana
+ * @since 10.0
+ */
+@Beta
+public final class Uninterruptibles {
+
+  // Implementation Note: As of 3-7-11, the logic for each blocking/timeout
+  // methods is identical, save for method being invoked.
+
+  /**
+   * Invokes {@code latch.}{@link CountDownLatch#await() await()}
+   * uninterruptibly.
+   */
+  public static void awaitUninterruptibly(CountDownLatch latch) {
+    boolean interrupted = false;
+    try {
+      while (true) {
+        try {
+          latch.await();
+          return;
+        } catch (InterruptedException e) {
+          interrupted = true;
+        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /**
+   * Invokes
+   * {@code latch.}{@link CountDownLatch#await(long, TimeUnit)
+   * await(timeout, unit)} uninterruptibly.
+   */
+  public static boolean awaitUninterruptibly(CountDownLatch latch,
+      long timeout, TimeUnit unit) {
+    boolean interrupted = false;
+    try {
+      long remainingNanos = unit.toNanos(timeout);
+      long end = System.nanoTime() + remainingNanos;
+
+      while (true) {
+        try {
+          // CountDownLatch treats negative timeouts just like zero.
+          return latch.await(remainingNanos, NANOSECONDS);
+        } catch (InterruptedException e) {
+          interrupted = true;
+          remainingNanos = end - System.nanoTime();
+        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /**
+   * Invokes {@code toJoin.}{@link Thread#join() join()} uninterruptibly.
+   */
+  public static void joinUninterruptibly(Thread toJoin) {
+    boolean interrupted = false;
+    try {
+      while (true) {
+        try {
+          toJoin.join();
+          return;
+        } catch (InterruptedException e) {
+          interrupted = true;
+        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /**
+   * Invokes
+   * {@code unit.}{@link TimeUnit#timedJoin(Thread, long)
+   * timedJoin(toJoin, timeout)} uninterruptibly.
+   */
+  public static void joinUninterruptibly(Thread toJoin,
+      long timeout, TimeUnit unit) {
+    Preconditions.checkNotNull(toJoin);
+    boolean interrupted = false;
+    try {
+      long remainingNanos = unit.toNanos(timeout);
+      long end = System.nanoTime() + remainingNanos;
+      while (true) {
+        try {
+          // TimeUnit.timedJoin() treats negative timeouts just like zero.
+          NANOSECONDS.timedJoin(toJoin, remainingNanos);
+          return;
+        } catch (InterruptedException e) {
+          interrupted = true;
+          remainingNanos = end - System.nanoTime();
+        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /**
+   * Invokes {@code future.}{@link Future#get() get()} uninterruptibly.
+   * To get uninterruptibility and remove checked exceptions, see
+   * {@link Futures#getUnchecked}.
+   *
+   * <p>If instead, you wish to treat {@link InterruptedException} uniformly
+   * with other exceptions, see {@link Futures#get(Future, Class) Futures.get}
+   * or {@link Futures#makeChecked}.
+   *
+   * @throws ExecutionException if the computation threw an exception
+   * @throws CancellationException if the computation was cancelled
+   */
+  public static <V> V getUninterruptibly(Future<V> future)
+      throws ExecutionException {
+    boolean interrupted = false;
+    try {
+      while (true) {
+        try {
+          return future.get();
+        } catch (InterruptedException e) {
+          interrupted = true;
+        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /**
+   * Invokes
+   * {@code future.}{@link Future#get(long, TimeUnit) get(timeout, unit)}
+   * uninterruptibly.
+   *
+   * <p>If instead, you wish to treat {@link InterruptedException} uniformly
+   * with other exceptions, see {@link Futures#get(Future, Class) Futures.get}
+   * or {@link Futures#makeChecked}.
+   *
+   * @throws ExecutionException if the computation threw an exception
+   * @throws CancellationException if the computation was cancelled
+   * @throws TimeoutException if the wait timed out
+   */
+  public static <V> V getUninterruptibly(
+      Future<V> future, long timeout,  TimeUnit unit)
+          throws ExecutionException, TimeoutException {
+    boolean interrupted = false;
+    try {
+      long remainingNanos = unit.toNanos(timeout);
+      long end = System.nanoTime() + remainingNanos;
+
+      while (true) {
+        try {
+          // Future treats negative timeouts just like zero.
+          return future.get(remainingNanos, NANOSECONDS);
+        } catch (InterruptedException e) {
+          interrupted = true;
+          remainingNanos = end - System.nanoTime();
+        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /**
+   * Invokes {@code queue.}{@link BlockingQueue#take() take()} uninterruptibly.
+   */
+  public static <E> E takeUninterruptibly(BlockingQueue<E> queue) {
+    boolean interrupted = false;
+    try {
+      while (true) {
+        try {
+          return queue.take();
+        } catch (InterruptedException e) {
+          interrupted = true;
+        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /**
+   * Invokes {@code queue.}{@link BlockingQueue#put(Object) put(element)}
+   * uninterruptibly.
+   *
+   * @throws ClassCastException if the class of the specified element prevents
+   *     it from being added to the given queue
+   * @throws IllegalArgumentException if some property of the specified element
+   *     prevents it from being added to the given queue
+   */
+  public static <E> void putUninterruptibly(BlockingQueue<E> queue, E element) 
{
+    boolean interrupted = false;
+    try {
+      while (true) {
+        try {
+          queue.put(element);
+          return;
+        } catch (InterruptedException e) {
+          interrupted = true;
+        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  // TODO(user): Support Sleeper somehow (wrapper or interface method)?
+  /**
+   * Invokes {@code unit.}{@link TimeUnit#sleep(long) sleep(sleepFor)}
+   * uninterruptibly.
+   */
+  public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
+    boolean interrupted = false;
+    try {
+      long remainingNanos = unit.toNanos(sleepFor);
+      long end = System.nanoTime() + remainingNanos;
+      while (true) {
+        try {
+          // TimeUnit.sleep() treats negative timeouts just like zero.
+          NANOSECONDS.sleep(remainingNanos);
+          return;
+        } catch (InterruptedException e) {
+          interrupted = true;
+          remainingNanos = end - System.nanoTime();
+        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /**
+   * Invokes {@code semaphore.}{@link Semaphore#tryAcquire(int, long, TimeUnit)
+   * tryAcquire(1, timeout, unit)} uninterruptibly.
+   *
+   * @since 18.0
+   */
+  public static boolean tryAcquireUninterruptibly(
+      Semaphore semaphore, long timeout, TimeUnit unit) {
+    return tryAcquireUninterruptibly(semaphore, 1, timeout, unit);
+  }
+
+  /**
+   * Invokes {@code semaphore.}{@link Semaphore#tryAcquire(int, long, TimeUnit)
+   * tryAcquire(permits, timeout, unit)} uninterruptibly.
+   *
+   * @since 18.0
+   */
+  public static boolean tryAcquireUninterruptibly(
+      Semaphore semaphore, int permits, long timeout, TimeUnit unit) {
+    boolean interrupted = false;
+    try {
+      long remainingNanos = unit.toNanos(timeout);
+      long end = System.nanoTime() + remainingNanos;
+
+      while (true) {
+        try {
+          // Semaphore treats negative timeouts just like zero.
+          return semaphore.tryAcquire(permits, remainingNanos, NANOSECONDS);
+        } catch (InterruptedException e) {
+          interrupted = true;
+          remainingNanos = end - System.nanoTime();
+        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  // TODO(user): Add support for waitUninterruptibly.
+
+  private Uninterruptibles() {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/20338fc6/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
index 5bf3805..a7db6c4 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
@@ -82,6 +82,106 @@ public class TestStressSource {
     }
     verify(mockProcessor, times(35)).processEvent(getEvent(source));
   }
+  
+  @Test
+  public void testRateLimitedEventsNoBatch() throws InterruptedException,
+      EventDeliveryException {
+    StressSource source = new StressSource();
+    source.setChannelProcessor(mockProcessor);
+    Context context = new Context();
+    context.put("maxTotalEvents", "20");
+    context.put("maxEventsPerSecond", "20");
+    source.configure(context);
+
+    long startTime = System.currentTimeMillis();
+    source.start();
+
+    for (int i = 0; i < 20; i++) {
+      source.process();
+    }
+
+    long finishTime = System.currentTimeMillis();
+
+    //Expecting to see within a second +/- 30% for 20 events
+    Assert.assertTrue(finishTime - startTime < 1300);
+    Assert.assertTrue(finishTime - startTime > 700);
+    source.stop();
+  }
+  
+  @Test
+  public void testNonRateLimitedEventsNoBatch() throws InterruptedException,
+      EventDeliveryException {
+    StressSource source = new StressSource();
+    source.setChannelProcessor(mockProcessor);
+    Context context = new Context();
+
+    //Test with no limit - expect to see very fast performance
+    context = new Context();
+    context.put("maxTotalEvents", "20");
+    context.put("maxEventsPerSecond", "0");
+    source.configure(context);
+
+    long startTime = System.currentTimeMillis();
+    source.start();
+
+    for (int i = 0; i <= 20; i++) {
+      source.process();
+    }
+
+    long finishTime = System.currentTimeMillis();
+
+    Assert.assertTrue(finishTime - startTime < 70);
+  }
+  
+  @Test
+  public void testRateLimitedEventsBatch() throws InterruptedException,
+      EventDeliveryException {
+    StressSource source = new StressSource();
+    source.setChannelProcessor(mockProcessor);
+    Context context = new Context();
+    context.put("maxTotalEvents", "20");
+    context.put("maxEventsPerSecond", "20");
+    context.put("batchSize", "3");
+    source.configure(context);
+
+    long startTime = System.currentTimeMillis();
+    source.start();
+
+    for (int i = 0; i < 20; i++) {
+      source.process();
+    }
+
+    long finishTime = System.currentTimeMillis();
+
+    //Expecting to see within a second +/- 30% for 20 events
+    Assert.assertTrue(finishTime - startTime < 1300);
+    Assert.assertTrue(finishTime - startTime > 700);
+    source.stop();
+  }
+  
+  @Test
+  public void testNonRateLimitedEventsBatch() throws InterruptedException,
+      EventDeliveryException {
+    StressSource source = new StressSource();
+    source.setChannelProcessor(mockProcessor);
+    Context context = new Context();
+
+    //Test with no limit - expect to see very fast performance
+    context.put("maxTotalEvents", "20");
+    context.put("maxEventsPerSecond", "0");
+    source.configure(context);
+
+    long startTime = System.currentTimeMillis();
+    source.start();
+
+    for (int i = 0; i <= 20; i++) {
+      source.process();
+    }
+
+    long finishTime = System.currentTimeMillis();
+
+    Assert.assertTrue(finishTime - startTime < 70);
+  }
 
   @Test
   public void testBatchEvents() throws InterruptedException,

http://git-wip-us.apache.org/repos/asf/flume/blob/20338fc6/flume-ng-core/src/test/java/org/apache/flume/source/shaded/guava/TestRateLimiter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/shaded/guava/TestRateLimiter.java
 
b/flume-ng-core/src/test/java/org/apache/flume/source/shaded/guava/TestRateLimiter.java
new file mode 100644
index 0000000..9cd1681
--- /dev/null
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/source/shaded/guava/TestRateLimiter.java
@@ -0,0 +1,468 @@
+package org.apache.flume.source.shaded.guava;
+/*
+ * Copyright (C) 2012 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//import static java.lang.reflect.Modifier.isStatic;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+//import com.google.common.collect.ImmutableClassToInstanceMap;
+//import com.google.common.collect.ImmutableSet;
+//import com.google.common.collect.Lists;
+//import com.google.common.testing.NullPointerTester;
+//import com.google.common.testing.NullPointerTester.Visibility;
+import org.apache.flume.source.shaded.guava.RateLimiter.SleepingStopwatch;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.TestCase;
+
+//import org.easymock.EasyMock;
+//import org.mockito.Mockito;
+
+//import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+//import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for RateLimiter.
+ *
+ * @author Dimitris Andreou
+ */
+public class TestRateLimiter extends TestCase {
+  private static final double EPSILON = 1e-8;
+
+  private final FakeStopwatch stopwatch = new FakeStopwatch();
+
+  public void testSimple() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 5.0);
+    limiter.acquire(); // R0.00, since it's the first request
+    limiter.acquire(); // R0.20
+    limiter.acquire(); // R0.20
+    assertEvents("R0.00", "R0.20", "R0.20");
+  }
+
+  public void testImmediateTryAcquire() {
+    RateLimiter r = RateLimiter.create(1);
+    assertTrue("Unable to acquire initial permit", r.tryAcquire());
+    assertFalse("Capable of acquiring secondary permit", r.tryAcquire());
+  }
+
+  public void testSimpleRateUpdate() {
+    RateLimiter limiter = RateLimiter.create(5.0, 5, SECONDS);
+    assertEquals(5.0, limiter.getRate());
+    limiter.setRate(10.0);
+    assertEquals(10.0, limiter.getRate());
+
+    try {
+      limiter.setRate(0.0);
+      fail();
+    } catch (IllegalArgumentException expected) { }
+    try {
+      limiter.setRate(-10.0);
+      fail();
+    } catch (IllegalArgumentException expected) { }
+  }
+
+  public void testAcquireParameterValidation() {
+    RateLimiter limiter = RateLimiter.create(999);
+    try {
+      limiter.acquire(0);
+      fail();
+    } catch (IllegalArgumentException expected) {
+    }
+    try {
+      limiter.acquire(-1);
+      fail();
+    } catch (IllegalArgumentException expected) {
+    }
+    try {
+      limiter.tryAcquire(0);
+      fail();
+    } catch (IllegalArgumentException expected) {
+    }
+    try {
+      limiter.tryAcquire(-1);
+      fail();
+    } catch (IllegalArgumentException expected) {
+    }
+    try {
+      limiter.tryAcquire(0, 1, SECONDS);
+      fail();
+    } catch (IllegalArgumentException expected) {
+    }
+    try {
+      limiter.tryAcquire(-1, 1, SECONDS);
+      fail();
+    } catch (IllegalArgumentException expected) {
+    }
+  }
+
+  public void testSimpleWithWait() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 5.0);
+    limiter.acquire();          // R0.00
+    stopwatch.sleepMillis(200);    // U0.20, we are ready for the next 
request...
+    limiter.acquire();          // R0.00, ...which is granted immediately
+    limiter.acquire();          // R0.20
+    assertEvents("R0.00", "U0.20", "R0.00", "R0.20");
+  }
+
+  public void testSimpleAcquireReturnValues() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 5.0);
+    assertEquals(0.0, limiter.acquire(), EPSILON);  // R0.00
+    stopwatch.sleepMillis(200);                     // U0.20, we are ready for 
the next request...
+    assertEquals(0.0, limiter.acquire(), EPSILON);  // R0.00, ...which is 
granted immediately
+    assertEquals(0.2, limiter.acquire(), EPSILON);  // R0.20
+    assertEvents("R0.00", "U0.20", "R0.00", "R0.20");
+  }
+
+  public void testSimpleAcquireEarliestAvailableIsInPast() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 5.0);
+    assertEquals(0.0, limiter.acquire(), EPSILON);
+    stopwatch.sleepMillis(400);
+    assertEquals(0.0, limiter.acquire(), EPSILON);
+    assertEquals(0.0, limiter.acquire(), EPSILON);
+    assertEquals(0.2, limiter.acquire(), EPSILON);
+  }
+
+  public void testOneSecondBurst() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 5.0);
+    stopwatch.sleepMillis(1000); // max capacity reached
+    stopwatch.sleepMillis(1000); // this makes no difference
+    limiter.acquire(1); // R0.00, since it's the first request
+
+    limiter.acquire(1); // R0.00, from capacity
+    limiter.acquire(3); // R0.00, from capacity
+    limiter.acquire(1); // R0.00, concluding a burst of 5 permits
+
+    limiter.acquire(); // R0.20, capacity exhausted
+    assertEvents("U1.00", "U1.00",
+        "R0.00", "R0.00", "R0.00", "R0.00", // first request and burst
+        "R0.20");
+  }
+
+  public void testCreateWarmupParameterValidation() {
+    RateLimiter.create(1.0, 1, NANOSECONDS);
+    RateLimiter.create(1.0, 0, NANOSECONDS);
+
+    try {
+      RateLimiter.create(0.0, 1, NANOSECONDS);
+      fail();
+    } catch (IllegalArgumentException expected) {
+    }
+
+    try {
+      RateLimiter.create(1.0, -1, NANOSECONDS);
+      fail();
+    } catch (IllegalArgumentException expected) {
+    }
+  }
+
+  public void testWarmUp() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 2.0, 4000, 
MILLISECONDS);
+    for (int i = 0; i < 8; i++) {
+      limiter.acquire(); // #1
+    }
+    stopwatch.sleepMillis(500); // #2: to repay for the last acquire
+    stopwatch.sleepMillis(4000); // #3: becomes cold again
+    for (int i = 0; i < 8; i++) {
+      limiter.acquire(); // // #4
+    }
+    stopwatch.sleepMillis(500); // #5: to repay for the last acquire
+    stopwatch.sleepMillis(2000); // #6: didn't get cold! It would take another 
2 seconds to go cold
+    for (int i = 0; i < 8; i++) {
+      limiter.acquire(); // #7
+    }
+    assertEvents(
+        "R0.00, R1.38, R1.13, R0.88, R0.63, R0.50, R0.50, R0.50", // #1
+        "U0.50", // #2
+        "U4.00", // #3
+        "R0.00, R1.38, R1.13, R0.88, R0.63, R0.50, R0.50, R0.50", // #4
+        "U0.50", // #5
+        "U2.00", // #6
+        "R0.00, R0.50, R0.50, R0.50, R0.50, R0.50, R0.50, R0.50"); // #7
+  }
+
+  public void testWarmUpAndUpdate() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 2.0, 4000, 
MILLISECONDS);
+    for (int i = 0; i < 8; i++) {
+      limiter.acquire(); // // #1
+    }
+    stopwatch.sleepMillis(4500); // #2: back to cold state (warmup period + 
repay last acquire)
+    for (int i = 0; i < 3; i++) { // only three steps, we're somewhere in the 
warmup period
+      limiter.acquire(); // #3
+    }
+
+    limiter.setRate(4.0); // double the rate!
+    limiter.acquire(); // #4, we repay the debt of the last acquire (imposed 
by the old rate)
+    for (int i = 0; i < 4; i++) {
+      limiter.acquire(); // #5
+    }
+    stopwatch.sleepMillis(4250); // #6, back to cold state (warmup period + 
repay last acquire)
+    for (int i = 0; i < 11; i++) {
+      limiter.acquire(); // #7, showing off the warmup starting from totally 
cold
+    }
+
+    // make sure the areas (times) remain the same, while permits are different
+    assertEvents(
+        "R0.00, R1.38, R1.13, R0.88, R0.63, R0.50, R0.50, R0.50", // #1
+        "U4.50", // #2
+        "R0.00, R1.38, R1.13", // #3, after that the rate changes
+        "R0.88", // #4, this is what the throttling would be with the old rate
+        "R0.34, R0.28, R0.25, R0.25", // #5
+        "U4.25", // #6
+        "R0.00, R0.72, R0.66, R0.59, R0.53, R0.47, R0.41", // #7
+        "R0.34, R0.28, R0.25, R0.25"); // #7 (cont.), note, this matches #5
+  }
+
+  public void testBurstyAndUpdate() {
+    RateLimiter rateLimiter = RateLimiter.create(stopwatch, 1.0);
+    rateLimiter.acquire(1); // no wait
+    rateLimiter.acquire(1); // R1.00, to repay previous
+
+    rateLimiter.setRate(2.0); // update the rate!
+
+    rateLimiter.acquire(1); // R1.00, to repay previous (the previous was 
under the old rate!)
+    rateLimiter.acquire(2); // R0.50, to repay previous (now the rate takes 
effect)
+    rateLimiter.acquire(4); // R1.00, to repay previous
+    rateLimiter.acquire(1); // R2.00, to repay previous
+    assertEvents("R0.00", "R1.00", "R1.00", "R0.50", "R1.00", "R2.00");
+  }
+
+  public void testTryAcquire_noWaitAllowed() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 5.0);
+    assertTrue(limiter.tryAcquire(0, SECONDS));
+    assertFalse(limiter.tryAcquire(0, SECONDS));
+    assertFalse(limiter.tryAcquire(0, SECONDS));
+    stopwatch.sleepMillis(100);
+    assertFalse(limiter.tryAcquire(0, SECONDS));
+  }
+
+  public void testTryAcquire_someWaitAllowed() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 5.0);
+    assertTrue(limiter.tryAcquire(0, SECONDS));
+    assertTrue(limiter.tryAcquire(200, MILLISECONDS));
+    assertFalse(limiter.tryAcquire(100, MILLISECONDS));
+    stopwatch.sleepMillis(100);
+    assertTrue(limiter.tryAcquire(100, MILLISECONDS));
+  }
+
+  public void testTryAcquire_overflow() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 5.0);
+    assertTrue(limiter.tryAcquire(0, MICROSECONDS));
+    stopwatch.sleepMillis(100);
+    assertTrue(limiter.tryAcquire(Long.MAX_VALUE, MICROSECONDS));
+  }
+
+  public void testTryAcquire_negative() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 5.0);
+    assertTrue(limiter.tryAcquire(5, 0, SECONDS));
+    stopwatch.sleepMillis(900);
+    assertFalse(limiter.tryAcquire(1, Long.MIN_VALUE, SECONDS));
+    stopwatch.sleepMillis(100);
+    assertTrue(limiter.tryAcquire(1, -1, SECONDS));
+  }
+
+  public void testSimpleWeights() {
+    RateLimiter rateLimiter = RateLimiter.create(stopwatch, 1.0);
+    rateLimiter.acquire(1); // no wait
+    rateLimiter.acquire(1); // R1.00, to repay previous
+    rateLimiter.acquire(2); // R1.00, to repay previous
+    rateLimiter.acquire(4); // R2.00, to repay previous
+    rateLimiter.acquire(8); // R4.00, to repay previous
+    rateLimiter.acquire(1); // R8.00, to repay previous
+    assertEvents("R0.00", "R1.00", "R1.00", "R2.00", "R4.00", "R8.00");
+  }
+
+  public void testInfinity_Bursty() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 
Double.POSITIVE_INFINITY);
+    limiter.acquire(Integer.MAX_VALUE / 4);
+    limiter.acquire(Integer.MAX_VALUE / 2);
+    limiter.acquire(Integer.MAX_VALUE);
+    assertEvents("R0.00", "R0.00", "R0.00"); // no wait, infinite rate!
+
+    limiter.setRate(2.0);
+    limiter.acquire();
+    limiter.acquire();
+    limiter.acquire();
+    limiter.acquire();
+    limiter.acquire();
+    assertEvents(
+        "R0.00", // First comes the saved-up burst, which defaults to a 
1-second burst (2 requests).
+        "R0.00",
+        "R0.00", // Now comes the free request.
+        "R0.50", // Now it's 0.5 seconds per request.
+        "R0.50");
+
+    limiter.setRate(Double.POSITIVE_INFINITY);
+    limiter.acquire();
+    limiter.acquire();
+    limiter.acquire();
+    assertEvents("R0.50", "R0.00", "R0.00"); // we repay the last request 
(.5sec), then back to +oo
+  }
+
+  /** https://code.google.com/p/guava-libraries/issues/detail?id=1791 */
+  public void testInfinity_BustyTimeElapsed() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 
Double.POSITIVE_INFINITY);
+    stopwatch.instant += 1000000;
+    limiter.setRate(2.0);
+    for (int i = 0; i < 5; i++) {
+      limiter.acquire();
+    }
+    assertEvents(
+        "R0.00", // First comes the saved-up burst, which defaults to a 
1-second burst (2 requests).
+        "R0.00",
+        "R0.00", // Now comes the free request.
+        "R0.50", // Now it's 0.5 seconds per request.
+        "R0.50");
+  }
+
+  public void testInfinity_WarmUp() {
+    RateLimiter limiter = RateLimiter.create(
+        stopwatch, Double.POSITIVE_INFINITY, 10, SECONDS);
+    limiter.acquire(Integer.MAX_VALUE / 4);
+    limiter.acquire(Integer.MAX_VALUE / 2);
+    limiter.acquire(Integer.MAX_VALUE);
+    assertEvents("R0.00", "R0.00", "R0.00");
+
+    limiter.setRate(1.0);
+    limiter.acquire();
+    limiter.acquire();
+    limiter.acquire();
+    assertEvents("R0.00", "R1.00", "R1.00");
+
+    limiter.setRate(Double.POSITIVE_INFINITY);
+    limiter.acquire();
+    limiter.acquire();
+    limiter.acquire();
+    assertEvents("R1.00", "R0.00", "R0.00");
+  }
+
+  public void testInfinity_WarmUpTimeElapsed() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 
Double.POSITIVE_INFINITY, 10, SECONDS);
+    stopwatch.instant += 1000000;
+    limiter.setRate(1.0);
+    for (int i = 0; i < 5; i++) {
+      limiter.acquire();
+    }
+    assertEvents("R0.00", "R1.00", "R1.00", "R1.00", "R1.00");
+  }
+
+  /**
+   * Make sure that bursts can never go above 1-second-worth-of-work for the 
current
+   * rate, even when we change the rate.
+   */
+  public void testWeNeverGetABurstMoreThanOneSec() {
+    RateLimiter limiter = RateLimiter.create(stopwatch, 1.0);
+    int[] rates = { 1000, 1, 10, 1000000, 10, 1};
+    for (int rate : rates) {
+      int oneSecWorthOfWork = rate;
+      stopwatch.sleepMillis(rate * 1000);
+      limiter.setRate(rate);
+      long burst = measureTotalTimeMillis(limiter, oneSecWorthOfWork, new 
Random());
+      // we allow one second worth of work to go in a burst (i.e. take less 
than a second)
+      assertTrue(burst <= 1000);
+      long afterBurst = measureTotalTimeMillis(limiter, oneSecWorthOfWork, new 
Random());
+      // but work beyond that must take at least one second
+      assertTrue(afterBurst >= 1000);
+    }
+  }
+
+  /**
+   * This neat test shows that no matter what weights we use in our requests, 
if we push X
+   * amount of permits in a cool state, where X = rate * timeToCoolDown, and 
we have
+   * specified a timeToWarmUp() period, it will cost as the prescribed amount 
of time. E.g.,
+   * calling [acquire(5), acquire(1)] takes exactly the same time as
+   * [acquire(2), acquire(3), acquire(1)].
+   */
+  public void testTimeToWarmUpIsHonouredEvenWithWeights() {
+    Random random = new Random();
+    int maxPermits = 10;
+    double[] qpsToTest = { 4.0, 2.0, 1.0, 0.5, 0.1 };
+    for (int trial = 0; trial < 100; trial++) {
+      for (double qps : qpsToTest) {
+        // Since we know that: maxPermits = 0.5 * warmup / stableInterval;
+        // then if maxPermits == 10, we have:
+        // warmupSeconds = 20 / qps
+        long warmupMillis = (long) ((2 * maxPermits / qps) * 1000.0);
+        RateLimiter rateLimiter = RateLimiter.create(
+            stopwatch, qps, warmupMillis, MILLISECONDS);
+        assertEquals(warmupMillis, measureTotalTimeMillis(rateLimiter, 
maxPermits, random));
+      }
+    }
+  }
+
+  private long measureTotalTimeMillis(RateLimiter rateLimiter, int permits, 
Random random) {
+    long startTime = stopwatch.instant;
+    while (permits > 0) {
+      int nextPermitsToAcquire = Math.max(1, random.nextInt(permits));
+      permits -= nextPermitsToAcquire;
+      rateLimiter.acquire(nextPermitsToAcquire);
+    }
+    rateLimiter.acquire(1); // to repay for any pending debt
+    return NANOSECONDS.toMillis(stopwatch.instant - startTime);
+  }
+
+  private void assertEvents(String... events) {
+    assertEquals(Arrays.toString(events), stopwatch.readEventsAndClear());
+  }
+
+  /**
+   * The stopwatch gathers events and presents them as strings.
+   * R0.6 means a delay of 0.6 seconds caused by the (R)ateLimiter
+   * U1.0 means the (U)ser caused the stopwatch to sleep for a second.
+   */
+  static class FakeStopwatch extends SleepingStopwatch {
+    long instant = 0L;
+    final List<String> events = Lists.newArrayList();
+
+    @Override
+    public long readMicros() {
+      return NANOSECONDS.toMicros(instant);
+    }
+
+    void sleepMillis(int millis) {
+      sleepMicros("U", MILLISECONDS.toMicros(millis));
+    }
+
+    void sleepMicros(String caption, long micros) {
+      instant += MICROSECONDS.toNanos(micros);
+      events.add(caption + String.format("%3.2f", (micros / 1000000.0)));
+    }
+
+    @Override
+    void sleepMicrosUninterruptibly(long micros) {
+      sleepMicros("R", micros);
+    }
+
+    String readEventsAndClear() {
+      try {
+        return events.toString();
+      } finally {
+        events.clear();
+      }
+    }
+
+    @Override
+    public String toString() {
+      return events.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/20338fc6/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index af91f9e..3c93f52 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1845,6 +1845,7 @@ size                 500          Payload size of each 
Event. Unit:**byte**
 maxTotalEvents       -1           Maximum number of Events to be sent
 maxSuccessfulEvents  -1           Maximum number of Events successfully sent
 batchSize            1            Number of Events to be sent in one batch
+maxEventsPerSecond   0            When set to an integer greater than zero, 
enforces a rate limiter onto the source.
 ===================  ===========  
===================================================
 
 Example for agent named **a1**:

Reply via email to