Author: proyal
Date: Sun Sep 2 09:54:52 2007
New Revision: 572031
URL: http://svn.apache.org/viewvc?rev=572031&view=rev
Log:
DIRMINA-429 and DIRMINA-430 - The SocketSessionImpl keeps track of whether or
not it is in the flush queue, removing the need to check the size of its write
request queue. Also have the SocketSessionImpl keep track of the # of
outstanding bytes-to-be-written in an AtomicInteger.
Modified:
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
Modified:
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?rev=572031&r1=572030&r2=572031&view=diff
==============================================================================
---
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
(original)
+++
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
Sun Sep 2 09:54:52 2007
@@ -46,14 +46,14 @@
// SocketIoProcessor.doFlush() will reset it after write is finished
// because the buffer will be passed with messageSent event.
- ((ByteBuffer) writeRequest.getMessage()).mark();
- synchronized (writeRequestQueue) {
- writeRequestQueue.add(writeRequest);
- if (writeRequestQueue.size() == 1
- && session.getTrafficMask().isWritable()) {
- // Notify SocketIoProcessor only when writeRequestQueue was
empty.
- s.getIoProcessor().flush(s);
- }
+ ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
+ buffer.mark();
+
+ s.getScheduledWriteBytesCounter().addAndGet(buffer.remaining());
+ writeRequestQueue.add(writeRequest);
+
+ if (session.getTrafficMask().isWritable()) {
+ s.getIoProcessor().flush(s);
}
}
Modified:
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL:
http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?rev=572031&r1=572030&r2=572031&view=diff
==============================================================================
---
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
(original)
+++
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
Sun Sep 2 09:54:52 2007
@@ -89,10 +89,11 @@
}
void flush(SocketSessionImpl session) {
- scheduleFlush(session);
- Selector selector = this.selector;
- if (selector != null) {
- selector.wakeup();
+ if ( scheduleFlush(session) ) {
+ Selector selector = this.selector;
+ if (selector != null) {
+ selector.wakeup();
+ }
}
}
@@ -108,8 +109,14 @@
removingSessions.add(session);
}
- private void scheduleFlush(SocketSessionImpl session) {
- flushingSessions.add(session);
+ private boolean scheduleFlush(SocketSessionImpl session) {
+ if ( session.getInFlushQueue().compareAndSet( false, true ) ) {
+ flushingSessions.add(session);
+
+ return true;
+ }
+
+ return false;
}
private void scheduleTrafficControl(SocketSessionImpl session) {
@@ -210,7 +217,7 @@
if (readBytes > 0) {
session.getFilterChain().fireMessageReceived(session, buf);
buf = null;
-
+
if (readBytes * 2 < session.getReadBufferSize()) {
if (session.getReadBufferSize() > 64) {
session.setReadBufferSize(session.getReadBufferSize()
>>> 1);
@@ -299,6 +306,8 @@
if (session == null)
break;
+ session.getInFlushQueue().set( false );
+
if (!session.isConnected()) {
releaseWriteBuffers(session);
continue;
@@ -318,7 +327,10 @@
}
try {
- doFlush(session);
+ boolean flushedAll = doFlush(session);
+ if( flushedAll && !session.getWriteRequestQueue().isEmpty() &&
!session.getInFlushQueue().get()) {
+ scheduleFlush( session );
+ }
} catch (IOException e) {
scheduleRemove(session);
session.getFilterChain().fireExceptionCaught(session, e);
@@ -329,11 +341,12 @@
private void releaseWriteBuffers(SocketSessionImpl session) {
Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
WriteRequest req;
-
- if ((req = writeRequestQueue.poll()) != null) {
+
+ while ((req = writeRequestQueue.poll()) != null) {
ByteBuffer buf = (ByteBuffer) req.getMessage();
try {
buf.release();
+ session.getScheduledWriteBytesCounter().addAndGet(
-buf.remaining() );
} catch (IllegalStateException e) {
session.getFilterChain().fireExceptionCaught(session, e);
} finally {
@@ -342,7 +355,7 @@
if (buf.hasRemaining()) {
req.getFuture().setWritten(false);
} else {
- session.getFilterChain().fireMessageSent(session, req);
+ session.getFilterChain().fireMessageSent(session, req);
}
}
@@ -359,7 +372,7 @@
}
}
- private void doFlush(SocketSessionImpl session) throws IOException {
+ private boolean doFlush(SocketSessionImpl session) throws IOException {
// Clear OP_WRITE
SelectionKey key = session.getSelectionKey();
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -372,34 +385,36 @@
try {
for (;;) {
WriteRequest req = writeRequestQueue.peek();
-
+
if (req == null)
break;
-
+
ByteBuffer buf = (ByteBuffer) req.getMessage();
if (buf.remaining() == 0) {
writeRequestQueue.poll();
-
+
session.increaseWrittenMessages();
-
+
buf.reset();
session.getFilterChain().fireMessageSent(session, req);
continue;
}
-
+
if (key.isWritable()) {
writtenBytes += ch.write(buf.buf());
}
-
+
if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
// Kernel buffer is full or wrote too much.
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- break;
+ return false;
}
}
} finally {
session.increaseWrittenBytes(writtenBytes);
}
+
+ return true;
}
private void doUpdateTrafficMask() {
Modified:
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL:
http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?rev=572031&r1=572030&r2=572031&view=diff
==============================================================================
---
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
(original)
+++
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
Sun Sep 2 09:54:52 2007
@@ -25,6 +25,8 @@
import java.nio.channels.SocketChannel;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.IoFilterChain;
@@ -71,6 +73,10 @@
private final IoServiceListenerSupport serviceListeners;
+ private final AtomicBoolean inFlushQueue = new AtomicBoolean( false );
+
+ private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
+
private SelectionKey key;
private int readBufferSize = 1024;
@@ -184,13 +190,18 @@
* @throws ClassCastException if an element is not a [EMAIL PROTECTED]
ByteBuffer}
*/
public int getScheduledWriteBytes() {
- int byteSize = 0;
+ return scheduledWriteBytes.get();
+ }
- for (WriteRequest request : writeRequestQueue) {
- byteSize += ((ByteBuffer) request.getMessage()).remaining();
- }
+ @Override
+ public void increaseWrittenBytes( int increment ) {
+ super.increaseWrittenBytes( increment );
- return byteSize;
+ scheduledWriteBytes.addAndGet( -increment );
+ }
+
+ AtomicInteger getScheduledWriteBytesCounter() {
+ return scheduledWriteBytes;
}
@Override
@@ -222,9 +233,13 @@
int getReadBufferSize() {
return readBufferSize;
}
-
+
void setReadBufferSize(int readBufferSize) {
this.readBufferSize = readBufferSize;
+ }
+
+ AtomicBoolean getInFlushQueue() {
+ return inFlushQueue;
}
private class SessionConfigImpl extends BaseIoSessionConfig implements