Author: trustin Date: Wed Dec 8 07:40:31 2004 New Revision: 111280 URL: http://svn.apache.org/viewcvs?view=rev&rev=111280 Log: Added WriteBuffer.putMarker() and SessionHandler.markerReleased(). Once a marker is put, it is released when all data put into write buffer before the marker is put is flushed to the channel. I used this techinique to implement messageSent event more accurately and it works great. Modified: incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java
Modified: incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java&r2=111280 ============================================================================== --- incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java (original) +++ incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java Wed Dec 8 07:40:31 2004 @@ -77,4 +77,7 @@ rb.signal(); } } + + public void markerReleased(Session session, Object marker) { + } } Modified: incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java&r2=111280 ============================================================================== --- incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java (original) +++ incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java Wed Dec 8 07:40:31 2004 @@ -59,4 +59,7 @@ public void dataWritten(Session session, int writtenBytes) { } + + public void markerReleased(Session session, Object marker) { + } } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java&r2=111280 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java Wed Dec 8 07:40:31 2004 @@ -39,4 +39,6 @@ void dataRead(Session session, int readBytes); void dataWritten(Session session, int writtenBytes); + + void markerReleased(Session session, Object marker); } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java&r2=111280 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java Wed Dec 8 07:40:31 2004 @@ -71,4 +71,6 @@ ByteBuffer asByteBuffer(); WriteBuffer flush(); + + WriteBuffer putMarker(Object marker); } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java&r2=111280 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java Wed Dec 8 07:40:31 2004 @@ -20,12 +20,10 @@ package org.apache.mina.core.socket; import java.io.IOException; - import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; - import java.util.Iterator; import java.util.Set; @@ -342,6 +340,26 @@ if (writtenBytes > 0) { session.increaseWrittenBytes(writtenBytes); fireDataWritten(session, writtenBytes); + Queue markers = lock.getMarkers(); + for (;;) { + TcpWriteBuffer.Marker marker = (TcpWriteBuffer.Marker) markers.first(); + if (marker == null) + break; + + int bytesLeft = marker.getBytesLeft(); + if (bytesLeft > writtenBytes) { + marker.setBytesLeft(bytesLeft - writtenBytes); + break; + } else if (bytesLeft == writtenBytes) { + markers.pop(); + fireMarkerRemoved(session, marker.getValue()); + break; + } else { + markers.pop(); + fireMarkerRemoved(session, marker.getValue()); + writtenBytes -= bytesLeft; + } + } } } } catch (IOException e) { @@ -384,6 +402,14 @@ private void fireDataWritten(TcpSession session, int writtenBytes) { try { session.getHandler().dataWritten(session, writtenBytes); + } catch (Throwable e) { + fireExceptionCaught(session, e); + } + } + + private void fireMarkerRemoved(TcpSession session, Object marker) { + try { + session.getHandler().markerReleased(session, marker); } catch (Throwable e) { fireExceptionCaught(session, e); } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java&r2=111280 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java Wed Dec 8 07:40:31 2004 @@ -111,7 +111,7 @@ void flush() { TcpIoProcessor.getInstance().flushSession(this); } - + public boolean isConnected() { return ch.isConnected(); } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java&r2=111280 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java Wed Dec 8 07:40:31 2004 @@ -24,6 +24,7 @@ import org.apache.mina.core.ReadBuffer; import org.apache.mina.core.WriteBuffer; +import org.apache.mina.util.Queue; /** @@ -35,6 +36,7 @@ class TcpWriteBuffer implements WriteBuffer { private final TcpSession session; private final ByteBuffer buf; + private final Queue markers = new Queue(16); TcpWriteBuffer(TcpSession session, ByteBuffer buf) { this.session = session; @@ -122,7 +124,7 @@ session.flush(); return this; } - + public boolean hasRemaining() { return buf.hasRemaining(); } @@ -148,5 +150,43 @@ public WriteBuffer reset() { buf.reset(); return this; + } + + Queue getMarkers() { + return markers; + } + + public WriteBuffer putMarker(Object marker) { + int bytesLeft; + if (markers.isEmpty()) { + bytesLeft = buf.position(); + } else { + bytesLeft = buf.position() - ((Marker) markers.last()).getBytesLeft(); + } + + markers.push(new Marker(marker, bytesLeft)); + return this; + } + + static class Marker { + private final Object value; + private int bytesLeft; + + private Marker(Object value, int bytesLeft) { + this.value = value; + this.bytesLeft = bytesLeft; + } + + public Object getValue() { + return value; + } + + public int getBytesLeft() { + return bytesLeft; + } + + public void setBytesLeft(int bytesLeft) { + this.bytesLeft = bytesLeft; + } } } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java&r2=111280 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java Wed Dec 8 07:40:31 2004 @@ -110,6 +110,10 @@ write(session); } + public void markerReleased(Session session, Object marker) { + fireMessageSent((ProtocolSession) session.getAttachment(), marker); + } + private void write(Session session) { ProtocolSessionImpl psession = (ProtocolSessionImpl) session.getAttachment(); @@ -124,9 +128,8 @@ while (!writeQueue.isEmpty()) { synchronized (out) { if (codec.encode(psession, writeQueue.first(), out)) { + out.putMarker(writeQueue.pop()); out.flush(); - // FIXME The message is not actually written. - fireMessageSent(psession, writeQueue.pop()); } else { out.flush(); break; Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java&r2=111280 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java Wed Dec 8 07:40:31 2004 @@ -118,6 +118,14 @@ return items[first]; } + + public Object last() { + if (size == 0) { + return null; + } + + return items[(last + items.length - 1) % items.length]; + } /** * Returns <code>true</code> if the queue is empty.
