This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/main by this push:
new 8e51eae97d Refactor TimeBucketCounter to support alternative
implementations
8e51eae97d is described below
commit 8e51eae97ded75641318ea18768a47a14165c6ad
Author: Mark Thomas <[email protected]>
AuthorDate: Fri Mar 7 13:27:58 2025 +0000
Refactor TimeBucketCounter to support alternative implementations
This is preparatory work for PR #794
---
.../apache/catalina/util/TimeBucketCounter.java | 185 +++++++--------------
...cketCounter.java => TimeBucketCounterBase.java} | 177 ++++++++++----------
2 files changed, 145 insertions(+), 217 deletions(-)
diff --git a/java/org/apache/catalina/util/TimeBucketCounter.java
b/java/org/apache/catalina/util/TimeBucketCounter.java
index 3b4726f7ff..78951623cc 100644
--- a/java/org/apache/catalina/util/TimeBucketCounter.java
+++ b/java/org/apache/catalina/util/TimeBucketCounter.java
@@ -14,122 +14,91 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.catalina.util;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.juli.logging.Log;
-import org.apache.juli.logging.LogFactory;
-import org.apache.tomcat.util.res.StringManager;
/**
- * This class maintains a thread safe hash map that has timestamp-based
buckets followed by a string for a key, and a
- * counter for a value. each time the increment() method is called it adds the
key if it does not exist, increments its
- * value and returns it. a maintenance thread cleans up keys that are prefixed
by previous timestamp buckets.
+ * A fast counter that optimizes efficiency at the expense of approximate
bucket indexing.
*/
-public class TimeBucketCounter {
-
- private static final Log log = LogFactory.getLog(TimeBucketCounter.class);
- private static final StringManager sm =
StringManager.getManager(TimeBucketCounter.class);
-
- /**
- * Map to hold the buckets
- */
- private final ConcurrentHashMap<String,AtomicInteger> map = new
ConcurrentHashMap<>();
+public class TimeBucketCounter extends TimeBucketCounterBase {
- /**
- * Milliseconds bucket size as a Power of 2 for bit shift math, e.g. 16
for 65_536ms which is about 1:05 minute
- */
+ // Milliseconds bucket size as a Power of 2 for bit shift math, e.g. 16
for 65_536ms which is about 1:05 minute.
private final int numBits;
- /**
- * Ratio of actual duration to config duration
- */
+ // Ratio of actual duration to config duration
private final double ratio;
- /**
- * The future allowing control of the background processor.
- */
- private ScheduledFuture<?> maintenanceFuture;
- private ScheduledFuture<?> monitorFuture;
- private final ScheduledExecutorService executorService;
- private final long sleeptime;
- /**
- * Creates a new TimeBucketCounter with the specified lifetime.
- *
- * @param bucketDuration duration in seconds, e.g. for 1 minute pass 60
- * @param executorService the executor service which will be used to run
the maintenance
- */
public TimeBucketCounter(int bucketDuration, ScheduledExecutorService
executorService) {
+ super(getActualDuration(bucketDuration), executorService);
+ this.numBits = determineShiftBitsOfDuration(bucketDuration);
+ this.ratio = ratioToPowerOf2(bucketDuration * 1000);
+ }
- this.executorService = executorService;
-
- int durationMillis = bucketDuration * 1000;
-
- int bits = 0;
- int pof2 = nextPowerOf2(durationMillis);
- int bitCheck = pof2;
- while (bitCheck > 1) {
- bitCheck = pof2 >> ++bits;
- }
-
- this.numBits = bits;
- this.ratio = ratioToPowerOf2(durationMillis);
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Calculates the current time bucket index by shifting bits for fast
division, e.g. shift 16 bits is the same as
+ * dividing by 65,536 which is about 1:05m.
+ */
+ @Override
+ public long getBucketIndex(long timestamp) {
+ return timestamp >> this.numBits;
+ }
- int cleanupsPerBucketDuration = (durationMillis >= 60_000) ? 6 : 3;
- sleeptime = durationMillis / cleanupsPerBucketDuration;
- // Start our thread
- if (sleeptime > 0) {
- monitorFuture = executorService.scheduleWithFixedDelay(new
MaintenanceMonitor(), 0, 60, TimeUnit.SECONDS);
- }
+ public int getNumBits() {
+ return numBits;
}
+
/**
- * Increments the counter for the passed identifier in the current time
bucket and returns the new value.
- *
- * @param identifier an identifier for which we want to maintain count,
e.g. IP Address
+ * The actual duration may differ from the configured duration because it
is set to the next power of 2 value in
+ * order to perform very fast bit shift arithmetic.
*
- * @return the count within the current time bucket
+ * @return the actual bucket duration in milliseconds
*/
- public final int increment(String identifier) {
- String key = getCurrentBucketPrefix() + "-" + identifier;
- AtomicInteger ai = map.computeIfAbsent(key, v -> new AtomicInteger());
- return ai.incrementAndGet();
+ public int getActualDuration() {
+ return (int) Math.pow(2, getNumBits());
}
+
/**
- * Calculates the current time bucket prefix by shifting bits for fast
division, e.g. shift 16 bits is the same as
- * dividing by 65,536 which is about 1:05m.
+ * Determines the bits of shift for the specific bucket duration in
seconds, which used to figure out the correct
+ * bucket index.
+ *
+ * @param duration bucket duration in seconds
*
- * @return The current bucket prefix.
+ * @return bits to be shifted
*/
- public final int getCurrentBucketPrefix() {
- return (int) (System.currentTimeMillis() >> this.numBits);
+ protected static final int determineShiftBitsOfDuration(int duration) {
+ int bits = 0;
+ int pof2 = nextPowerOf2(duration * 1000);
+ int bitCheck = pof2;
+ while (bitCheck > 1) {
+ bitCheck = pof2 >> ++bits;
+ }
+ return bits;
}
- public int getNumBits() {
- return numBits;
- }
/**
* The actual duration may differ from the configured duration because it
is set to the next power of 2 value in
* order to perform very fast bit shift arithmetic.
*
- * @return the actual bucket duration in milliseconds
+ * @param duration in seconds
+ *
+ * @return the actual bucket duration in seconds
+ *
+ * @see FastTimeBucketCounter#determineShiftBitsOfDuration(int)
*/
- public int getActualDuration() {
- return (int) Math.pow(2, getNumBits());
+ private static int getActualDuration(int duration) {
+ return (int) (1L << determineShiftBitsOfDuration(duration)) / 1000;
}
+
/**
* Returns the ratio between the configured duration param and the actual
duration which will be set to the next
* power of 2. We then multiply the configured requests param by the same
ratio in order to compensate for the added
@@ -137,18 +106,25 @@ public class TimeBucketCounter {
*
* @return the ratio, e.g. 1.092 if the actual duration is 65_536 for the
configured duration of 60_000
*/
+ @Override
public double getRatio() {
return ratio;
}
+
/**
* Returns the ratio to the next power of 2 so that we can adjust the
value.
+ *
+ * @param value of target duration in seconds
+ *
+ * @return the ratio to the next power of 2 so that we can adjust the value
*/
static double ratioToPowerOf2(int value) {
double nextPO2 = nextPowerOf2(value);
return Math.round((1000 * nextPO2 / value)) / 1000d;
}
+
/**
* Returns the next power of 2 given a value, e.g. 256 for 250, or 1024,
for 1000.
*/
@@ -161,59 +137,12 @@ public class TimeBucketCounter {
return valueOfHighestBit << 1;
}
- /**
- * When we want to test a full bucket duration we need to sleep until the
next bucket starts.
- *
- * @return the number of milliseconds until the next bucket
- */
+
+ @Override
public long getMillisUntilNextBucket() {
long millis = System.currentTimeMillis();
long nextTimeBucketMillis = ((millis + (long) Math.pow(2, numBits)) >>
numBits) << numBits;
long delta = nextTimeBucketMillis - millis;
return delta;
}
-
- /**
- * Sets isRunning to false to terminate the maintenance thread.
- */
- public void destroy() {
- // Stop our thread
- if (monitorFuture != null) {
- monitorFuture.cancel(true);
- monitorFuture = null;
- }
- if (maintenanceFuture != null) {
- maintenanceFuture.cancel(true);
- maintenanceFuture = null;
- }
- }
-
- private class Maintenance implements Runnable {
- @Override
- public void run() {
- String currentBucketPrefix =
String.valueOf(getCurrentBucketPrefix());
- ConcurrentHashMap.KeySetView<String,AtomicInteger> keys =
map.keySet();
- // remove obsolete keys
- keys.removeIf(k -> !k.startsWith(currentBucketPrefix));
- }
- }
-
- private class MaintenanceMonitor implements Runnable {
- @Override
- public void run() {
- if (sleeptime > 0 && (maintenanceFuture == null ||
maintenanceFuture.isDone())) {
- if (maintenanceFuture != null && maintenanceFuture.isDone()) {
- // There was an error executing the scheduled task, get it
and log it
- try {
- maintenanceFuture.get();
- } catch (InterruptedException | ExecutionException e) {
-
log.error(sm.getString("timebucket.maintenance.error"), e);
- }
- }
- maintenanceFuture = executorService.scheduleWithFixedDelay(new
Maintenance(), sleeptime, sleeptime,
- TimeUnit.MILLISECONDS);
- }
- }
- }
-
}
diff --git a/java/org/apache/catalina/util/TimeBucketCounter.java
b/java/org/apache/catalina/util/TimeBucketCounterBase.java
similarity index 54%
copy from java/org/apache/catalina/util/TimeBucketCounter.java
copy to java/org/apache/catalina/util/TimeBucketCounterBase.java
index 3b4726f7ff..b2b0bbee09 100644
--- a/java/org/apache/catalina/util/TimeBucketCounter.java
+++ b/java/org/apache/catalina/util/TimeBucketCounterBase.java
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.catalina.util;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
@@ -30,62 +30,42 @@ import org.apache.tomcat.util.res.StringManager;
/**
* This class maintains a thread safe hash map that has timestamp-based
buckets followed by a string for a key, and a
- * counter for a value. each time the increment() method is called it adds the
key if it does not exist, increments its
- * value and returns it. a maintenance thread cleans up keys that are prefixed
by previous timestamp buckets.
+ * counter for an integer value. Each time the increment() method is called it
adds the key if it does not exist,
+ * increments its value and returns it.
*/
-public class TimeBucketCounter {
+public abstract class TimeBucketCounterBase {
- private static final Log log = LogFactory.getLog(TimeBucketCounter.class);
- private static final StringManager sm =
StringManager.getManager(TimeBucketCounter.class);
+ private static final Log log =
LogFactory.getLog(TimeBucketCounterBase.class);
+ private static final StringManager sm =
StringManager.getManager(TimeBucketCounterBase.class);
- /**
- * Map to hold the buckets
- */
- private final ConcurrentHashMap<String,AtomicInteger> map = new
ConcurrentHashMap<>();
-
- /**
- * Milliseconds bucket size as a Power of 2 for bit shift math, e.g. 16
for 65_536ms which is about 1:05 minute
- */
- private final int numBits;
+ private static final String BUCKET_KEY_DELIMITER = "-";
- /**
- * Ratio of actual duration to config duration
- */
- private final double ratio;
+ // Map to hold the buckets
+ private final ConcurrentHashMap<String,AtomicInteger> map = new
ConcurrentHashMap<>();
- /**
- * The future allowing control of the background processor.
- */
+ // The future allowing control of the background processor.
private ScheduledFuture<?> maintenanceFuture;
private ScheduledFuture<?> monitorFuture;
private final ScheduledExecutorService executorService;
private final long sleeptime;
+ private int bucketDuration;
+
/**
* Creates a new TimeBucketCounter with the specified lifetime.
*
* @param bucketDuration duration in seconds, e.g. for 1 minute pass 60
- * @param executorService the executor service which will be used to run
the maintenance
+ * @param executorService the executor service that will be used to run
the maintenance task
+ *
+ * @throws NullPointerException if executorService is <code>null</code>.
*/
- public TimeBucketCounter(int bucketDuration, ScheduledExecutorService
executorService) {
-
+ public TimeBucketCounterBase(int bucketDuration, ScheduledExecutorService
executorService) {
+ Objects.requireNonNull(executorService);
this.executorService = executorService;
+ this.bucketDuration = bucketDuration;
- int durationMillis = bucketDuration * 1000;
-
- int bits = 0;
- int pof2 = nextPowerOf2(durationMillis);
- int bitCheck = pof2;
- while (bitCheck > 1) {
- bitCheck = pof2 >> ++bits;
- }
-
- this.numBits = bits;
-
- this.ratio = ratioToPowerOf2(durationMillis);
-
- int cleanupsPerBucketDuration = (durationMillis >= 60_000) ? 6 : 3;
- sleeptime = durationMillis / cleanupsPerBucketDuration;
+ int cleanupsPerBucketDuration = (bucketDuration >= 60) ? 6 : 3;
+ sleeptime = bucketDuration * 1000 / cleanupsPerBucketDuration;
// Start our thread
if (sleeptime > 0) {
@@ -93,6 +73,23 @@ public class TimeBucketCounter {
}
}
+
+ /**
+ * @return bucketDuration in seconds
+ */
+ public int getBucketDuration() {
+ return bucketDuration;
+ }
+
+
+ /**
+ * Returns the ratio between the configured duration param and the actual
duration.
+ *
+ * @return the ratio between the configured duration param and the actual
duration.
+ */
+ public abstract double getRatio();
+
+
/**
* Increments the counter for the passed identifier in the current time
bucket and returns the new value.
*
@@ -101,83 +98,76 @@ public class TimeBucketCounter {
* @return the count within the current time bucket
*/
public final int increment(String identifier) {
- String key = getCurrentBucketPrefix() + "-" + identifier;
+ String key = genKey(identifier);
AtomicInteger ai = map.computeIfAbsent(key, v -> new AtomicInteger());
return ai.incrementAndGet();
}
+
/**
- * Calculates the current time bucket prefix by shifting bits for fast
division, e.g. shift 16 bits is the same as
- * dividing by 65,536 which is about 1:05m.
+ * Generates the key of timeBucket counter maps with the specific
identifier, and the timestamp is implicitly
+ * equivalent to "now".
*
- * @return The current bucket prefix.
+ * @param identifier an identifier for which we want to maintain count
+ *
+ * @return key of timeBucket counter maps
*/
- public final int getCurrentBucketPrefix() {
- return (int) (System.currentTimeMillis() >> this.numBits);
+ protected final String genKey(String identifier) {
+ return genKey(identifier, System.currentTimeMillis());
}
- public int getNumBits() {
- return numBits;
- }
/**
- * The actual duration may differ from the configured duration because it
is set to the next power of 2 value in
- * order to perform very fast bit shift arithmetic.
+ * Generates the key of timeBucket counter maps with the specific
identifier and timestamp.
+ *
+ * @param identifier of target request
+ * @param timestamp when target request received
*
- * @return the actual bucket duration in milliseconds
+ * @return key of timeBucket counter maps
*/
- public int getActualDuration() {
- return (int) Math.pow(2, getNumBits());
+ protected final String genKey(String identifier, long timestamp) {
+ return getBucketIndex(timestamp) + BUCKET_KEY_DELIMITER + identifier;
}
+
/**
- * Returns the ratio between the configured duration param and the actual
duration which will be set to the next
- * power of 2. We then multiply the configured requests param by the same
ratio in order to compensate for the added
- * time, if any.
+ * Calculate the bucket index for the specific timestamp.
+ *
+ * @param timestamp the specific timestamp in milliseconds
*
- * @return the ratio, e.g. 1.092 if the actual duration is 65_536 for the
configured duration of 60_000
+ * @return prefix the bucket key prefix for the specific timestamp
*/
- public double getRatio() {
- return ratio;
- }
+ protected abstract long getBucketIndex(long timestamp);
- /**
- * Returns the ratio to the next power of 2 so that we can adjust the
value.
- */
- static double ratioToPowerOf2(int value) {
- double nextPO2 = nextPowerOf2(value);
- return Math.round((1000 * nextPO2 / value)) / 1000d;
- }
/**
- * Returns the next power of 2 given a value, e.g. 256 for 250, or 1024,
for 1000.
+ * Returns current bucket prefix
+ *
+ * @return bucket index
*/
- static int nextPowerOf2(int value) {
- int valueOfHighestBit = Integer.highestOneBit(value);
- if (valueOfHighestBit == value) {
- return value;
- }
-
- return valueOfHighestBit << 1;
+ public int getCurrentBucketPrefix() {
+ return (int) getBucketIndex(System.currentTimeMillis());
}
+
/**
* When we want to test a full bucket duration we need to sleep until the
next bucket starts.
+ * <p>
+ * <strong>WARNING:</strong> This method is used for test purpose.
*
* @return the number of milliseconds until the next bucket
+ *
+ * @deprecated Will be made package private in Tomcat 12 onwards.
*/
- public long getMillisUntilNextBucket() {
- long millis = System.currentTimeMillis();
- long nextTimeBucketMillis = ((millis + (long) Math.pow(2, numBits)) >>
numBits) << numBits;
- long delta = nextTimeBucketMillis - millis;
- return delta;
- }
+ @Deprecated
+ public abstract long getMillisUntilNextBucket();
+
/**
- * Sets isRunning to false to terminate the maintenance thread.
+ * Stops threads created by this object and cleans up resources.
*/
public void destroy() {
- // Stop our thread
+ map.clear();
if (monitorFuture != null) {
monitorFuture.cancel(true);
monitorFuture = null;
@@ -188,13 +178,23 @@ public class TimeBucketCounter {
}
}
+
+ /**
+ * Periodic evict, perform removal of obsolete bucket items. Absence of
this operation may result in OOM after a
+ * long run.
+ */
+ public void periodicEvict() {
+ String currentBucketPrefix = String.valueOf(getCurrentBucketPrefix());
+ ConcurrentHashMap.KeySetView<String,AtomicInteger> keys = map.keySet();
+ // remove obsolete keys
+ keys.removeIf(k -> !k.startsWith(currentBucketPrefix));
+ }
+
+
private class Maintenance implements Runnable {
@Override
public void run() {
- String currentBucketPrefix =
String.valueOf(getCurrentBucketPrefix());
- ConcurrentHashMap.KeySetView<String,AtomicInteger> keys =
map.keySet();
- // remove obsolete keys
- keys.removeIf(k -> !k.startsWith(currentBucketPrefix));
+ periodicEvict();
}
}
@@ -215,5 +215,4 @@ public class TimeBucketCounter {
}
}
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]