Author: trustin
Date: Sun Mar 16 23:39:20 2008
New Revision: 637741
URL: http://svn.apache.org/viewvc?rev=637741&view=rev
Log:
Fixed issue: DIRMINA-552 - writtenBytes and lastWriteTime are not updated
immediately.
* Modified every flush operation to call increaseWrittenBytes when it returns -
seems to work fine
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=637741&r1=637740&r2=637741&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
Sun Mar 16 23:39:20 2008
@@ -530,29 +530,7 @@
}
}
- protected final void increaseWrittenBytesAndMessages(
- WriteRequest request, long currentTime) {
-
- Object message = request.getMessage();
- if (message instanceof IoBuffer) {
- IoBuffer b = (IoBuffer) message;
- if (b.hasRemaining()) {
- increaseWrittenBytes(((IoBuffer) message).remaining(),
currentTime);
- } else {
- increaseWrittenMessages(currentTime);
- }
- } else if (message instanceof FileRegion) {
- FileRegion region = (FileRegion) message;
- if (region.getRemainingBytes() == 0) {
- increaseWrittenBytes(region.getWrittenBytes(), currentTime);
- increaseWrittenMessages(currentTime);
- }
- } else {
- increaseWrittenMessages(currentTime);
- }
- }
-
- private void increaseWrittenBytes(long increment, long currentTime) {
+ protected final void increaseWrittenBytes(long increment, long
currentTime) {
if (increment <= 0) {
return;
}
@@ -569,7 +547,16 @@
increaseScheduledWriteBytes(-increment);
}
- private void increaseWrittenMessages(long currentTime) {
+ protected final void increaseWrittenMessages(
+ WriteRequest request, long currentTime) {
+ Object message = request.getMessage();
+ if (message instanceof IoBuffer) {
+ IoBuffer b = (IoBuffer) message;
+ if (b.hasRemaining()) {
+ return;
+ }
+ }
+
writtenMessages++;
lastWriteTime = currentTime;
if (getService() instanceof AbstractIoService) {
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java?rev=637741&r1=637740&r2=637741&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
Sun Mar 16 23:39:20 2008
@@ -303,10 +303,11 @@
processReadySessions(selectedHandles());
}
- flushSessions();
+ long currentTime = System.currentTimeMillis();
+ flushSessions(currentTime);
nHandles -= unregisterHandles();
- notifyIdleSessions();
+ notifyIdleSessions(currentTime);
if (nHandles == 0) {
synchronized (lock) {
@@ -379,7 +380,7 @@
}
}
- private void flushSessions() {
+ private void flushSessions(long currentTime) {
for (; ;) {
T session = flushingSessions.poll();
if (session == null) {
@@ -389,7 +390,7 @@
session.setScheduledForFlush(false);
try {
- boolean flushedAll = flush(session);
+ boolean flushedAll = flush(session, currentTime);
if (flushedAll &&
!session.getWriteRequestQueue().isEmpty(session) &&
!session.isScheduledForFlush()) {
scheduleFlush(session);
@@ -400,55 +401,58 @@
}
}
- private boolean flush(T session) throws Exception {
+ private boolean flush(T session, long currentTime) throws Exception {
// Clear OP_WRITE
setInterestedInWrite(session, false);
- WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
-
- int maxWrittenBytes =
+ final WriteRequestQueue writeRequestQueue =
session.getWriteRequestQueue();
+ final int maxWrittenBytes =
session.getConfig().getMaxReadBufferSize() +
(session.getConfig().getMaxReadBufferSize() >>> 1);
int writtenBytes = 0;
- for (; ;) {
- WriteRequest req = session.getCurrentWriteRequest();
- if (req == null) {
- req = writeRequestQueue.poll(session);
+ try {
+ for (; ;) {
+ WriteRequest req = session.getCurrentWriteRequest();
if (req == null) {
- break;
+ req = writeRequestQueue.poll(session);
+ if (req == null) {
+ break;
+ }
+ session.setCurrentWriteRequest(req);
+ }
+
+ IoBuffer buf = (IoBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ // Clear and fire event
+ session.setCurrentWriteRequest(null);
+ buf.reset();
+ session.getFilterChain().fireMessageSent(req);
+ continue;
+ }
+
+ SocketAddress destination = req.getDestination();
+ if (destination == null) {
+ destination = session.getRemoteAddress();
+ }
+
+ int localWrittenBytes = send(session, buf, destination);
+ if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes)
{
+ // Kernel buffer is full or wrote too much
+ setInterestedInWrite(session, true);
+ return false;
+ } else {
+ setInterestedInWrite(session, false);
+
+ // Clear and fire event
+ session.setCurrentWriteRequest(null);
+ writtenBytes += localWrittenBytes;
+ buf.reset();
+ session.getFilterChain().fireMessageSent(req);
}
- session.setCurrentWriteRequest(req);
- }
-
- IoBuffer buf = (IoBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- // Clear and fire event
- session.setCurrentWriteRequest(null);
- buf.reset();
- session.getFilterChain().fireMessageSent(req);
- continue;
- }
-
- SocketAddress destination = req.getDestination();
- if (destination == null) {
- destination = session.getRemoteAddress();
- }
-
- int localWrittenBytes = send(session, buf, destination);
- if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
- // Kernel buffer is full or wrote too much
- setInterestedInWrite(session, true);
- return false;
- } else {
- setInterestedInWrite(session, false);
-
- // Clear and fire event
- session.setCurrentWriteRequest(null);
- writtenBytes += localWrittenBytes;
- buf.reset();
- session.getFilterChain().fireMessageSent(req);
}
+ } finally {
+ session.increaseWrittenBytes(writtenBytes, currentTime);
}
return true;
@@ -524,9 +528,8 @@
return nHandles;
}
- private void notifyIdleSessions() {
+ private void notifyIdleSessions(long currentTime) {
// process idle sessions
- long currentTime = System.currentTimeMillis();
if (currentTime - lastIdleCheckTime >= 1000) {
lastIdleCheckTime = currentTime;
IdleStatusChecker.notifyIdleness(
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java?rev=637741&r1=637740&r2=637741&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
Sun Mar 16 23:39:20 2008
@@ -203,7 +203,7 @@
if (Thread.currentThread() == workerThread) {
// Bypass the queue if called from the worker thread itself
// (i.e. single thread model).
- flushNow(session);
+ flushNow(session, System.currentTimeMillis());
} else {
boolean needsWakeup = flushingSessions.isEmpty();
if (scheduleFlush(session) && needsWakeup) {
@@ -451,16 +451,15 @@
}
}
- private void notifyIdleSessions() throws Exception {
+ private void notifyIdleSessions(long currentTime) throws Exception {
// process idle sessions
- long currentTime = System.currentTimeMillis();
if (currentTime - lastIdleCheckTime >= 1000) {
lastIdleCheckTime = currentTime;
IdleStatusChecker.notifyIdleness(allSessions(), currentTime);
}
}
- private void flush() {
+ private void flush(long currentTime) {
for (; ;) {
T session = flushingSessions.poll();
@@ -473,7 +472,7 @@
switch (state) {
case OPEN:
try {
- boolean flushedAll = flushNow(session);
+ boolean flushedAll = flushNow(session, currentTime);
if (flushedAll &&
!session.getWriteRequestQueue().isEmpty(session) &&
!session.isScheduledForFlush()) {
scheduleFlush(session);
@@ -497,7 +496,7 @@
}
}
- private boolean flushNow(T session) {
+ private boolean flushNow(T session, long currentTime) {
if (!session.isConnected()) {
scheduleRemove(session);
return false;
@@ -506,18 +505,17 @@
final boolean hasFragmentation =
session.getTransportMetadata().hasFragmentation();
+ final WriteRequestQueue writeRequestQueue =
session.getWriteRequestQueue();
+
+ // Set limitation for the number of written bytes for read-write
+ // fairness. I used maxReadBufferSize * 3 / 2, which yields best
+ // performance in my experience while not breaking fairness much.
+ final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
+
+ (session.getConfig().getMaxReadBufferSize() >>>
1);
+ int writtenBytes = 0;
try {
// Clear OP_WRITE
setInterestedInWrite(session, false);
-
- WriteRequestQueue writeRequestQueue =
session.getWriteRequestQueue();
-
- // Set limitation for the number of written bytes for read-write
- // fairness. I used maxReadBufferSize * 3 / 2, which yields best
- // performance in my experience while not breaking fairness much.
- int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() +
- (session.getConfig().getMaxReadBufferSize()
>>> 1);
- int writtenBytes = 0;
do {
// Check for pending writes.
WriteRequest req = session.getCurrentWriteRequest();
@@ -540,7 +538,7 @@
session, req, hasFragmentation,
maxWrittenBytes - writtenBytes);
} else {
- throw new IllegalStateException("Don't know how to
handle message of type '" + message.getClass().getName() + "'. Are you missing
a protocol encoder?");
+ throw new IllegalStateException("Don't know how to handle
message of type '" + message.getClass().getName() + "'. Are you missing a
protocol encoder?");
}
writtenBytes += localWrittenBytes;
@@ -554,6 +552,8 @@
} catch (Exception e) {
session.getFilterChain().fireExceptionCaught(e);
return false;
+ } finally {
+ session.increaseWrittenBytes(writtenBytes, currentTime);
}
return true;
@@ -686,9 +686,10 @@
process();
}
- flush();
+ long currentTime = System.currentTimeMillis();
+ flush(currentTime);
nSessions -= remove();
- notifyIdleSessions();
+ notifyIdleSessions(currentTime);
if (nSessions == 0) {
synchronized (lock) {
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=637741&r1=637740&r2=637741&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
Sun Mar 16 23:39:20 2008
@@ -410,7 +410,7 @@
}
public void fireMessageSent(WriteRequest request) {
- session.increaseWrittenBytesAndMessages(request,
System.currentTimeMillis());
+ session.increaseWrittenMessages(request, System.currentTimeMillis());
try {
request.getFuture().setWritten();
@@ -421,7 +421,7 @@
Entry head = this.head;
callNextMessageSent(head, session, request);
}
-
+
private void callNextMessageSent(Entry entry, IoSession session,
WriteRequest writeRequest) {
try {