Repository: mina Updated Branches: refs/heads/2.0 1223863fd -> e0da8d7fd
Fix for DIRMINA-629 : used a lock around all the methods to avoid concurrent issues. Removed the AtomicXXX which are not anymore useful. Project: http://git-wip-us.apache.org/repos/asf/mina/repo Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/e0da8d7f Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/e0da8d7f Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/e0da8d7f Branch: refs/heads/2.0 Commit: e0da8d7fdecf9d412063089b860a960caa0795bf Parents: 1223863 Author: Emmanuel Lécharny <[email protected]> Authored: Sat Sep 6 22:21:46 2014 +0200 Committer: Emmanuel Lécharny <[email protected]> Committed: Sat Sep 6 22:21:46 2014 +0200 ---------------------------------------------------------------------- .../mina/core/service/IoServiceStatistics.java | 407 ++++++++++++++----- 1 file changed, 304 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina/blob/e0da8d7f/mina-core/src/main/java/org/apache/mina/core/service/IoServiceStatistics.java ---------------------------------------------------------------------- diff --git a/mina-core/src/main/java/org/apache/mina/core/service/IoServiceStatistics.java b/mina-core/src/main/java/org/apache/mina/core/service/IoServiceStatistics.java index 44a82df..9ec4f17 100644 --- a/mina-core/src/main/java/org/apache/mina/core/service/IoServiceStatistics.java +++ b/mina-core/src/main/java/org/apache/mina/core/service/IoServiceStatistics.java @@ -20,7 +20,8 @@ package org.apache.mina.core.service; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * Provides usage statistics for an {@link AbstractIoService} instance. @@ -32,32 +33,46 @@ public class IoServiceStatistics { private AbstractIoService service; + /** The number of bytes read per second */ private double readBytesThroughput; + /** The number of bytes written per second */ private double writtenBytesThroughput; + /** The number of messages read per second */ private double readMessagesThroughput; + /** The number of messages written per second */ private double writtenMessagesThroughput; + /** The biggest number of bytes read per second */ private double largestReadBytesThroughput; + /** The biggest number of bytes written per second */ private double largestWrittenBytesThroughput; + /** The biggest number of messages read per second */ private double largestReadMessagesThroughput; + /** The biggest number of messages written per second */ private double largestWrittenMessagesThroughput; - private final AtomicLong readBytes = new AtomicLong(); + /** The number of read bytes since the service has been started */ + private long readBytes; - private final AtomicLong writtenBytes = new AtomicLong(); + /** The number of written bytes since the service has been started */ + private long writtenBytes; - private final AtomicLong readMessages = new AtomicLong(); + /** The number of read messages since the service has been started */ + private long readMessages; - private final AtomicLong writtenMessages = new AtomicLong(); + /** The number of written messages since the service has been started */ + private long writtenMessages; + /** The time the last read operation occurred */ private long lastReadTime; + /** The time the last write operation occurred */ private long lastWriteTime; private long lastReadBytes; @@ -70,162 +85,246 @@ public class IoServiceStatistics { private long lastThroughputCalculationTime; - private final AtomicInteger scheduledWriteBytes = new AtomicInteger(); + private int scheduledWriteBytes; - private final AtomicInteger scheduledWriteMessages = new AtomicInteger(); + private int scheduledWriteMessages; - private int throughputCalculationInterval = 3; + /** The time (in second) between the computation of the service's statistics */ + private final AtomicInteger throughputCalculationInterval = new AtomicInteger(3); - private final Object throughputCalculationLock = new Object(); + private final Lock throughputCalculationLock = new ReentrantLock(); public IoServiceStatistics(AbstractIoService service) { this.service = service; } /** - * Returns the maximum number of sessions which were being managed at the - * same time. + * @return The maximum number of sessions which were being managed at the + * same time. */ public final int getLargestManagedSessionCount() { return service.getListeners().getLargestManagedSessionCount(); } /** - * Returns the cumulative number of sessions which were managed (or are - * being managed) by this service, which means 'currently managed session - * count + closed session count'. + * @return The cumulative number of sessions which were managed (or are + * being managed) by this service, which means 'currently managed + * session count + closed session count'. */ public final long getCumulativeManagedSessionCount() { return service.getListeners().getCumulativeManagedSessionCount(); } /** - * Returns the time in millis when I/O occurred lastly. + * @return the time in millis when the last I/O operation (read or write) + * occurred. */ public final long getLastIoTime() { - return Math.max(lastReadTime, lastWriteTime); + throughputCalculationLock.lock(); + + try { + return Math.max(lastReadTime, lastWriteTime); + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the time in millis when read operation occurred lastly. + * @return The time in millis when the last read operation occurred. */ public final long getLastReadTime() { - return lastReadTime; + throughputCalculationLock.lock(); + + try { + return lastReadTime; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the time in millis when write operation occurred lastly. + * @return The time in millis when the last write operation occurred. */ public final long getLastWriteTime() { - return lastWriteTime; + throughputCalculationLock.lock(); + + try { + return lastWriteTime; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the number of bytes read by this service - * - * @return - * The number of bytes this service has read + * @return The number of bytes this service has read so far */ public final long getReadBytes() { - return readBytes.get(); + throughputCalculationLock.lock(); + + try { + return readBytes; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the number of bytes written out by this service - * - * @return - * The number of bytes this service has written + * @return The number of bytes this service has written so far */ public final long getWrittenBytes() { - return writtenBytes.get(); + throughputCalculationLock.lock(); + + try { + return writtenBytes; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the number of messages this services has read - * - * @return - * The number of messages this services has read + * @return The number of messages this services has read so far */ public final long getReadMessages() { - return readMessages.get(); + throughputCalculationLock.lock(); + + try { + return readMessages; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the number of messages this service has written - * - * @return - * The number of messages this service has written + * @return The number of messages this service has written so far */ public final long getWrittenMessages() { - return writtenMessages.get(); + throughputCalculationLock.lock(); + + try { + return writtenMessages; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the number of read bytes per second. + * @return The number of read bytes per second. */ public final double getReadBytesThroughput() { - resetThroughput(); - return readBytesThroughput; + throughputCalculationLock.lock(); + + try { + resetThroughput(); + return readBytesThroughput; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the number of written bytes per second. + * @return The number of written bytes per second. */ public final double getWrittenBytesThroughput() { - resetThroughput(); - return writtenBytesThroughput; + throughputCalculationLock.lock(); + + try { + resetThroughput(); + return writtenBytesThroughput; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the number of read messages per second. + * @return The number of read messages per second. */ public final double getReadMessagesThroughput() { - resetThroughput(); - return readMessagesThroughput; + throughputCalculationLock.lock(); + + try { + resetThroughput(); + return readMessagesThroughput; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the number of written messages per second. + * @return The number of written messages per second. */ public final double getWrittenMessagesThroughput() { - resetThroughput(); - return writtenMessagesThroughput; + throughputCalculationLock.lock(); + + try { + resetThroughput(); + return writtenMessagesThroughput; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the maximum of the {@link #getReadBytesThroughput() readBytesThroughput}. + * @return The maximum number of bytes read per second since the service has + * been started. */ public final double getLargestReadBytesThroughput() { - return largestReadBytesThroughput; + throughputCalculationLock.lock(); + + try { + return largestReadBytesThroughput; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the maximum of the {@link #getWrittenBytesThroughput() writtenBytesThroughput}. + * @return The maximum number of bytes written per second since the service + * has been started. */ public final double getLargestWrittenBytesThroughput() { - return largestWrittenBytesThroughput; + throughputCalculationLock.lock(); + + try { + return largestWrittenBytesThroughput; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the maximum of the {@link #getReadMessagesThroughput() readMessagesThroughput}. + * @return The maximum number of messages read per second since the service + * has been started. */ public final double getLargestReadMessagesThroughput() { - return largestReadMessagesThroughput; + throughputCalculationLock.lock(); + + try { + return largestReadMessagesThroughput; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the maximum of the {@link #getWrittenMessagesThroughput() writtenMessagesThroughput}. + * @return The maximum number of messages written per second since the + * service has been started. */ public final double getLargestWrittenMessagesThroughput() { - return largestWrittenMessagesThroughput; + throughputCalculationLock.lock(); + + try { + return largestWrittenMessagesThroughput; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the interval (seconds) between each throughput calculation. - * The default value is <tt>3</tt> seconds. + * @return the interval (seconds) between each throughput calculation. The + * default value is <tt>3</tt> seconds. */ public final int getThroughputCalculationInterval() { - return throughputCalculationInterval; + return throughputCalculationInterval.get(); } /** @@ -233,7 +332,7 @@ public class IoServiceStatistics { * The default value is <tt>3</tt> seconds. */ public final long getThroughputCalculationIntervalInMillis() { - return throughputCalculationInterval * 1000L; + return throughputCalculationInterval.get() * 1000L; } /** @@ -245,26 +344,44 @@ public class IoServiceStatistics { throw new IllegalArgumentException("throughputCalculationInterval: " + throughputCalculationInterval); } - this.throughputCalculationInterval = throughputCalculationInterval; + this.throughputCalculationInterval.set(throughputCalculationInterval); } /** * Sets last time at which a read occurred on the service. + * + * @param lastReadTime + * The last time a read has occurred */ protected final void setLastReadTime(long lastReadTime) { - this.lastReadTime = lastReadTime; + throughputCalculationLock.lock(); + + try { + this.lastReadTime = lastReadTime; + } finally { + throughputCalculationLock.unlock(); + } } /** * Sets last time at which a write occurred on the service. + * + * @param lastReadTime + * The last time a write has occurred */ protected final void setLastWriteTime(long lastWriteTime) { - this.lastWriteTime = lastWriteTime; + throughputCalculationLock.lock(); + + try { + this.lastWriteTime = lastWriteTime; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Resets the throughput counters of the service if none session - * is currently managed. + * Resets the throughput counters of the service if no session is currently + * managed. */ private void resetThroughput() { if (service.getManagedSessionCount() == 0) { @@ -279,17 +396,20 @@ public class IoServiceStatistics { * Updates the throughput counters. */ public void updateThroughput(long currentTime) { - synchronized (throughputCalculationLock) { + throughputCalculationLock.lock(); + + try { int interval = (int) (currentTime - lastThroughputCalculationTime); long minInterval = getThroughputCalculationIntervalInMillis(); - if (minInterval == 0 || interval < minInterval) { + + if ((minInterval == 0) || (interval < minInterval)) { return; } - long readBytes = this.readBytes.get(); - long writtenBytes = this.writtenBytes.get(); - long readMessages = this.readMessages.get(); - long writtenMessages = this.writtenMessages.get(); + long readBytes = this.readBytes; + long writtenBytes = this.writtenBytes; + long readMessages = this.readMessages; + long writtenMessages = this.writtenMessages; readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval; writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval; @@ -299,12 +419,15 @@ public class IoServiceStatistics { if (readBytesThroughput > largestReadBytesThroughput) { largestReadBytesThroughput = readBytesThroughput; } + if (writtenBytesThroughput > largestWrittenBytesThroughput) { largestWrittenBytesThroughput = writtenBytesThroughput; } + if (readMessagesThroughput > largestReadMessagesThroughput) { largestReadMessagesThroughput = readMessagesThroughput; } + if (writtenMessagesThroughput > largestWrittenMessagesThroughput) { largestWrittenMessagesThroughput = writtenMessagesThroughput; } @@ -315,84 +438,162 @@ public class IoServiceStatistics { lastWrittenMessages = writtenMessages; lastThroughputCalculationTime = currentTime; + } finally { + throughputCalculationLock.unlock(); } } /** - * Increases the count of read bytes by <code>increment</code> and sets + * Increases the count of read bytes by <code>nbBytesRead</code> and sets * the last read time to <code>currentTime</code>. - */ - public final void increaseReadBytes(long increment, long currentTime) { - readBytes.addAndGet(increment); - lastReadTime = currentTime; + * + * @param nbBytesRead + * The number of bytes read + * @param currentTime + * The date those bytes were read + */ + public final void increaseReadBytes(long nbBytesRead, long currentTime) { + throughputCalculationLock.lock(); + + try { + readBytes += nbBytesRead; + lastReadTime = currentTime; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Increases the count of read messages by 1 and sets the last read time to + * Increases the count of read messages by 1 and sets the last read time to * <code>currentTime</code>. + * + * @param currentTime + * The time the message has been read */ public final void increaseReadMessages(long currentTime) { - readMessages.incrementAndGet(); - lastReadTime = currentTime; + throughputCalculationLock.lock(); + + try { + readMessages++; + lastReadTime = currentTime; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Increases the count of written bytes by <code>increment</code> and sets - * the last write time to <code>currentTime</code>. + * Increases the count of written bytes by <code>nbBytesWritten</code> and + * sets the last write time to <code>currentTime</code>. + * + * @param nbBytesWritten + * The number of bytes written + * @param currentTime + * The date those bytes were written */ - public final void increaseWrittenBytes(int increment, long currentTime) { - writtenBytes.addAndGet(increment); - lastWriteTime = currentTime; + public final void increaseWrittenBytes(int nbBytesWritten, long currentTime) { + throughputCalculationLock.lock(); + + try { + writtenBytes += nbBytesWritten; + lastWriteTime = currentTime; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Increases the count of written messages by 1 and sets the last write time to - * <code>currentTime</code>. + * Increases the count of written messages by 1 and sets the last write time + * to <code>currentTime</code>. + * + * @param currentTime + * The date the message were written */ public final void increaseWrittenMessages(long currentTime) { - writtenMessages.incrementAndGet(); - lastWriteTime = currentTime; + throughputCalculationLock.lock(); + + try { + writtenMessages++; + lastWriteTime = currentTime; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the count of bytes scheduled for write. + * @return The count of bytes scheduled for write. */ public final int getScheduledWriteBytes() { - return scheduledWriteBytes.get(); + throughputCalculationLock.lock(); + + try { + return scheduledWriteBytes; + } finally { + throughputCalculationLock.unlock(); + } } /** * Increments by <code>increment</code> the count of bytes scheduled for write. */ public final void increaseScheduledWriteBytes(int increment) { - scheduledWriteBytes.addAndGet(increment); + throughputCalculationLock.lock(); + + try { + scheduledWriteBytes += increment; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Returns the count of messages scheduled for write. + * @return the count of messages scheduled for write. */ public final int getScheduledWriteMessages() { - return scheduledWriteMessages.get(); + throughputCalculationLock.lock(); + + try { + return scheduledWriteMessages; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Increments by 1 the count of messages scheduled for write. + * Increments the count of messages scheduled for write. */ public final void increaseScheduledWriteMessages() { - scheduledWriteMessages.incrementAndGet(); + throughputCalculationLock.lock(); + + try { + scheduledWriteMessages++; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Decrements by 1 the count of messages scheduled for write. + * Decrements the count of messages scheduled for write. */ public final void decreaseScheduledWriteMessages() { - scheduledWriteMessages.decrementAndGet(); + throughputCalculationLock.lock(); + + try { + scheduledWriteMessages--; + } finally { + throughputCalculationLock.unlock(); + } } /** - * Sets the time at which throughtput counters where updated. + * Sets the time at which throughput counters where updated. */ protected void setLastThroughputCalculationTime(long lastThroughputCalculationTime) { - this.lastThroughputCalculationTime = lastThroughputCalculationTime; + throughputCalculationLock.lock(); + + try { + this.lastThroughputCalculationTime = lastThroughputCalculationTime; + } finally { + throughputCalculationLock.unlock(); + } } }
