Repository: mina
Updated Branches:
  refs/heads/2.0 1223863fd -> e0da8d7fd


Fix for DIRMINA-629 : used a lock around all the methods to avoid
concurrent issues. Removed the AtomicXXX which are not anymore useful.

Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/e0da8d7f
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/e0da8d7f
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/e0da8d7f

Branch: refs/heads/2.0
Commit: e0da8d7fdecf9d412063089b860a960caa0795bf
Parents: 1223863
Author: Emmanuel Lécharny <[email protected]>
Authored: Sat Sep 6 22:21:46 2014 +0200
Committer: Emmanuel Lécharny <[email protected]>
Committed: Sat Sep 6 22:21:46 2014 +0200

----------------------------------------------------------------------
 .../mina/core/service/IoServiceStatistics.java  | 407 ++++++++++++++-----
 1 file changed, 304 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/e0da8d7f/mina-core/src/main/java/org/apache/mina/core/service/IoServiceStatistics.java
----------------------------------------------------------------------
diff --git 
a/mina-core/src/main/java/org/apache/mina/core/service/IoServiceStatistics.java 
b/mina-core/src/main/java/org/apache/mina/core/service/IoServiceStatistics.java
index 44a82df..9ec4f17 100644
--- 
a/mina-core/src/main/java/org/apache/mina/core/service/IoServiceStatistics.java
+++ 
b/mina-core/src/main/java/org/apache/mina/core/service/IoServiceStatistics.java
@@ -20,7 +20,8 @@
 package org.apache.mina.core.service;
 
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Provides usage statistics for an {@link AbstractIoService} instance.
@@ -32,32 +33,46 @@ public class IoServiceStatistics {
 
     private AbstractIoService service;
 
+    /** The number of bytes read per second */
     private double readBytesThroughput;
 
+    /** The number of bytes written per second */
     private double writtenBytesThroughput;
 
+    /** The number of messages read per second */
     private double readMessagesThroughput;
 
+    /** The number of messages written per second */
     private double writtenMessagesThroughput;
 
+    /** The biggest number of bytes read per second */
     private double largestReadBytesThroughput;
 
+    /** The biggest number of bytes written per second */
     private double largestWrittenBytesThroughput;
 
+    /** The biggest number of messages read per second */
     private double largestReadMessagesThroughput;
 
+    /** The biggest number of messages written per second */
     private double largestWrittenMessagesThroughput;
 
-    private final AtomicLong readBytes = new AtomicLong();
+    /** The number of read bytes since the service has been started */
+    private long readBytes;
 
-    private final AtomicLong writtenBytes = new AtomicLong();
+    /** The number of written bytes since the service has been started */
+    private long writtenBytes;
 
-    private final AtomicLong readMessages = new AtomicLong();
+    /** The number of read messages since the service has been started */
+    private long readMessages;
 
-    private final AtomicLong writtenMessages = new AtomicLong();
+    /** The number of written messages since the service has been started */
+    private long writtenMessages;
 
+    /** The time the last read operation occurred */
     private long lastReadTime;
 
+    /** The time the last write operation occurred */
     private long lastWriteTime;
 
     private long lastReadBytes;
@@ -70,162 +85,246 @@ public class IoServiceStatistics {
 
     private long lastThroughputCalculationTime;
 
-    private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
+    private int scheduledWriteBytes;
 
-    private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
+    private int scheduledWriteMessages;
 
-    private int throughputCalculationInterval = 3;
+    /** The time (in second) between the computation of the service's 
statistics */
+    private final AtomicInteger throughputCalculationInterval = new 
AtomicInteger(3);
 
-    private final Object throughputCalculationLock = new Object();
+    private final Lock throughputCalculationLock = new ReentrantLock();
 
     public IoServiceStatistics(AbstractIoService service) {
         this.service = service;
     }
 
     /**
-     * Returns the maximum number of sessions which were being managed at the
-     * same time.
+     * @return The maximum number of sessions which were being managed at the
+     *         same time.
      */
     public final int getLargestManagedSessionCount() {
         return service.getListeners().getLargestManagedSessionCount();
     }
 
     /**
-     * Returns the cumulative number of sessions which were managed (or are
-     * being managed) by this service, which means 'currently managed session
-     * count + closed session count'.
+     * @return The cumulative number of sessions which were managed (or are
+     *         being managed) by this service, which means 'currently managed
+     *         session count + closed session count'.
      */
     public final long getCumulativeManagedSessionCount() {
         return service.getListeners().getCumulativeManagedSessionCount();
     }
 
     /**
-     * Returns the time in millis when I/O occurred lastly.
+     * @return the time in millis when the last I/O operation (read or write)
+     *         occurred.
      */
     public final long getLastIoTime() {
-        return Math.max(lastReadTime, lastWriteTime);
+        throughputCalculationLock.lock();
+
+        try {
+            return Math.max(lastReadTime, lastWriteTime);
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the time in millis when read operation occurred lastly.
+     * @return The time in millis when the last read operation occurred.
      */
     public final long getLastReadTime() {
-        return lastReadTime;
+        throughputCalculationLock.lock();
+
+        try {
+            return lastReadTime;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the time in millis when write operation occurred lastly.
+     * @return The time in millis when the last write operation occurred.
      */
     public final long getLastWriteTime() {
-        return lastWriteTime;
+        throughputCalculationLock.lock();
+
+        try {
+            return lastWriteTime;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the number of bytes read by this service
-     *
-     * @return
-     *     The number of bytes this service has read
+     * @return The number of bytes this service has read so far
      */
     public final long getReadBytes() {
-        return readBytes.get();
+        throughputCalculationLock.lock();
+
+        try {
+            return readBytes;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the number of bytes written out by this service
-     *
-     * @return
-     *     The number of bytes this service has written
+     * @return The number of bytes this service has written so far
      */
     public final long getWrittenBytes() {
-        return writtenBytes.get();
+        throughputCalculationLock.lock();
+
+        try {
+            return writtenBytes;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the number of messages this services has read
-     *
-     * @return
-     *     The number of messages this services has read
+     * @return The number of messages this services has read so far
      */
     public final long getReadMessages() {
-        return readMessages.get();
+        throughputCalculationLock.lock();
+
+        try {
+            return readMessages;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the number of messages this service has written
-     *
-     * @return
-     *     The number of messages this service has written
+     * @return The number of messages this service has written so far
      */
     public final long getWrittenMessages() {
-        return writtenMessages.get();
+        throughputCalculationLock.lock();
+
+        try {
+            return writtenMessages;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the number of read bytes per second.
+     * @return The number of read bytes per second.
      */
     public final double getReadBytesThroughput() {
-        resetThroughput();
-        return readBytesThroughput;
+        throughputCalculationLock.lock();
+
+        try {
+            resetThroughput();
+            return readBytesThroughput;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the number of written bytes per second.
+     * @return The number of written bytes per second.
      */
     public final double getWrittenBytesThroughput() {
-        resetThroughput();
-        return writtenBytesThroughput;
+        throughputCalculationLock.lock();
+
+        try {
+            resetThroughput();
+            return writtenBytesThroughput;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the number of read messages per second.
+     * @return The number of read messages per second.
      */
     public final double getReadMessagesThroughput() {
-        resetThroughput();
-        return readMessagesThroughput;
+        throughputCalculationLock.lock();
+
+        try {
+            resetThroughput();
+            return readMessagesThroughput;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the number of written messages per second.
+     * @return The number of written messages per second.
      */
     public final double getWrittenMessagesThroughput() {
-        resetThroughput();
-        return writtenMessagesThroughput;
+        throughputCalculationLock.lock();
+
+        try {
+            resetThroughput();
+            return writtenMessagesThroughput;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the maximum of the {@link #getReadBytesThroughput() 
readBytesThroughput}.
+     * @return The maximum number of bytes read per second since the service 
has
+     *         been started.
      */
     public final double getLargestReadBytesThroughput() {
-        return largestReadBytesThroughput;
+        throughputCalculationLock.lock();
+
+        try {
+            return largestReadBytesThroughput;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the maximum of the {@link #getWrittenBytesThroughput() 
writtenBytesThroughput}.
+     * @return The maximum number of bytes written per second since the service
+     *         has been started.
      */
     public final double getLargestWrittenBytesThroughput() {
-        return largestWrittenBytesThroughput;
+        throughputCalculationLock.lock();
+
+        try {
+            return largestWrittenBytesThroughput;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the maximum of the {@link #getReadMessagesThroughput() 
readMessagesThroughput}.
+     * @return The maximum number of messages read per second since the service
+     *         has been started.
      */
     public final double getLargestReadMessagesThroughput() {
-        return largestReadMessagesThroughput;
+        throughputCalculationLock.lock();
+
+        try {
+            return largestReadMessagesThroughput;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the maximum of the {@link #getWrittenMessagesThroughput() 
writtenMessagesThroughput}.
+     * @return The maximum number of messages written per second since the
+     *         service has been started.
      */
     public final double getLargestWrittenMessagesThroughput() {
-        return largestWrittenMessagesThroughput;
+        throughputCalculationLock.lock();
+
+        try {
+            return largestWrittenMessagesThroughput;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the interval (seconds) between each throughput calculation.
-     * The default value is <tt>3</tt> seconds.
+     * @return the interval (seconds) between each throughput calculation. The
+     *         default value is <tt>3</tt> seconds.
      */
     public final int getThroughputCalculationInterval() {
-        return throughputCalculationInterval;
+        return throughputCalculationInterval.get();
     }
 
     /**
@@ -233,7 +332,7 @@ public class IoServiceStatistics {
      * The default value is <tt>3</tt> seconds.
      */
     public final long getThroughputCalculationIntervalInMillis() {
-        return throughputCalculationInterval * 1000L;
+        return throughputCalculationInterval.get() * 1000L;
     }
 
     /**
@@ -245,26 +344,44 @@ public class IoServiceStatistics {
             throw new IllegalArgumentException("throughputCalculationInterval: 
" + throughputCalculationInterval);
         }
 
-        this.throughputCalculationInterval = throughputCalculationInterval;
+        this.throughputCalculationInterval.set(throughputCalculationInterval);
     }
 
     /**
      * Sets last time at which a read occurred on the service.
+     * 
+     * @param lastReadTime
+     *            The last time a read has occurred
      */
     protected final void setLastReadTime(long lastReadTime) {
-        this.lastReadTime = lastReadTime;
+        throughputCalculationLock.lock();
+
+        try {
+            this.lastReadTime = lastReadTime;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
      * Sets last time at which a write occurred on the service.
+     * 
+     * @param lastReadTime
+     *            The last time a write has occurred
      */
     protected final void setLastWriteTime(long lastWriteTime) {
-        this.lastWriteTime = lastWriteTime;
+        throughputCalculationLock.lock();
+
+        try {
+            this.lastWriteTime = lastWriteTime;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Resets the throughput counters of the service if none session 
-     * is currently managed. 
+     * Resets the throughput counters of the service if no session is currently
+     * managed.
      */
     private void resetThroughput() {
         if (service.getManagedSessionCount() == 0) {
@@ -279,17 +396,20 @@ public class IoServiceStatistics {
      * Updates the throughput counters.
      */
     public void updateThroughput(long currentTime) {
-        synchronized (throughputCalculationLock) {
+        throughputCalculationLock.lock();
+
+        try {
             int interval = (int) (currentTime - lastThroughputCalculationTime);
             long minInterval = getThroughputCalculationIntervalInMillis();
-            if (minInterval == 0 || interval < minInterval) {
+
+            if ((minInterval == 0) || (interval < minInterval)) {
                 return;
             }
 
-            long readBytes = this.readBytes.get();
-            long writtenBytes = this.writtenBytes.get();
-            long readMessages = this.readMessages.get();
-            long writtenMessages = this.writtenMessages.get();
+            long readBytes = this.readBytes;
+            long writtenBytes = this.writtenBytes;
+            long readMessages = this.readMessages;
+            long writtenMessages = this.writtenMessages;
 
             readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / 
interval;
             writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 
1000.0 / interval;
@@ -299,12 +419,15 @@ public class IoServiceStatistics {
             if (readBytesThroughput > largestReadBytesThroughput) {
                 largestReadBytesThroughput = readBytesThroughput;
             }
+
             if (writtenBytesThroughput > largestWrittenBytesThroughput) {
                 largestWrittenBytesThroughput = writtenBytesThroughput;
             }
+
             if (readMessagesThroughput > largestReadMessagesThroughput) {
                 largestReadMessagesThroughput = readMessagesThroughput;
             }
+
             if (writtenMessagesThroughput > largestWrittenMessagesThroughput) {
                 largestWrittenMessagesThroughput = writtenMessagesThroughput;
             }
@@ -315,84 +438,162 @@ public class IoServiceStatistics {
             lastWrittenMessages = writtenMessages;
 
             lastThroughputCalculationTime = currentTime;
+        } finally {
+            throughputCalculationLock.unlock();
         }
     }
 
     /**
-     * Increases the count of read bytes by <code>increment</code> and sets 
+     * Increases the count of read bytes by <code>nbBytesRead</code> and sets
      * the last read time to <code>currentTime</code>.
-     */
-    public final void increaseReadBytes(long increment, long currentTime) {
-        readBytes.addAndGet(increment);
-        lastReadTime = currentTime;
+     * 
+     * @param nbBytesRead
+     *            The number of bytes read
+     * @param currentTime
+     *            The date those bytes were read
+     */
+    public final void increaseReadBytes(long nbBytesRead, long currentTime) {
+        throughputCalculationLock.lock();
+
+        try {
+            readBytes += nbBytesRead;
+            lastReadTime = currentTime;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Increases the count of read messages by 1 and sets the last read time 
to 
+     * Increases the count of read messages by 1 and sets the last read time to
      * <code>currentTime</code>.
+     * 
+     * @param currentTime
+     *            The time the message has been read
      */
     public final void increaseReadMessages(long currentTime) {
-        readMessages.incrementAndGet();
-        lastReadTime = currentTime;
+        throughputCalculationLock.lock();
+
+        try {
+            readMessages++;
+            lastReadTime = currentTime;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Increases the count of written bytes by <code>increment</code> and sets 
-     * the last write time to <code>currentTime</code>.
+     * Increases the count of written bytes by <code>nbBytesWritten</code> and
+     * sets the last write time to <code>currentTime</code>.
+     * 
+     * @param nbBytesWritten
+     *            The number of bytes written
+     * @param currentTime
+     *            The date those bytes were written
      */
-    public final void increaseWrittenBytes(int increment, long currentTime) {
-        writtenBytes.addAndGet(increment);
-        lastWriteTime = currentTime;
+    public final void increaseWrittenBytes(int nbBytesWritten, long 
currentTime) {
+        throughputCalculationLock.lock();
+
+        try {
+            writtenBytes += nbBytesWritten;
+            lastWriteTime = currentTime;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Increases the count of written messages by 1 and sets the last write 
time to 
-     * <code>currentTime</code>.
+     * Increases the count of written messages by 1 and sets the last write 
time
+     * to <code>currentTime</code>.
+     * 
+     * @param currentTime
+     *            The date the message were written
      */
     public final void increaseWrittenMessages(long currentTime) {
-        writtenMessages.incrementAndGet();
-        lastWriteTime = currentTime;
+        throughputCalculationLock.lock();
+
+        try {
+            writtenMessages++;
+            lastWriteTime = currentTime;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the count of bytes scheduled for write.
+     * @return The count of bytes scheduled for write.
      */
     public final int getScheduledWriteBytes() {
-        return scheduledWriteBytes.get();
+        throughputCalculationLock.lock();
+
+        try {
+            return scheduledWriteBytes;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
      * Increments by <code>increment</code> the count of bytes scheduled for 
write.
      */
     public final void increaseScheduledWriteBytes(int increment) {
-        scheduledWriteBytes.addAndGet(increment);
+        throughputCalculationLock.lock();
+
+        try {
+            scheduledWriteBytes += increment;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Returns the count of messages scheduled for write.
+     * @return the count of messages scheduled for write.
      */
     public final int getScheduledWriteMessages() {
-        return scheduledWriteMessages.get();
+        throughputCalculationLock.lock();
+
+        try {
+            return scheduledWriteMessages;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Increments by 1 the count of messages scheduled for write.
+     * Increments the count of messages scheduled for write.
      */
     public final void increaseScheduledWriteMessages() {
-        scheduledWriteMessages.incrementAndGet();
+        throughputCalculationLock.lock();
+
+        try {
+            scheduledWriteMessages++;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Decrements by 1 the count of messages scheduled for write.
+     * Decrements the count of messages scheduled for write.
      */
     public final void decreaseScheduledWriteMessages() {
-        scheduledWriteMessages.decrementAndGet();
+        throughputCalculationLock.lock();
+
+        try {
+            scheduledWriteMessages--;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 
     /**
-     * Sets the time at which throughtput counters where updated.
+     * Sets the time at which throughput counters where updated.
      */
     protected void setLastThroughputCalculationTime(long 
lastThroughputCalculationTime) {
-        this.lastThroughputCalculationTime = lastThroughputCalculationTime;
+        throughputCalculationLock.lock();
+
+        try {
+            this.lastThroughputCalculationTime = lastThroughputCalculationTime;
+        } finally {
+            throughputCalculationLock.unlock();
+        }
     }
 }

Reply via email to