Author: trustin
Date: Wed Dec  8 07:40:31 2004
New Revision: 111280

URL: http://svn.apache.org/viewcvs?view=rev&rev=111280
Log:
Added WriteBuffer.putMarker() and SessionHandler.markerReleased().
Once a marker is put, it is released when all data put into write buffer before 
the marker is put is flushed to the channel.
I used this techinique to implement messageSent event more accurately and it 
works great.
Modified:
   
incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java
   
incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java
   
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java
   
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java
   
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java
   
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java
   
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java
   
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java
   
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java

Modified: 
incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java&r2=111280
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java
    (original)
+++ 
incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java
    Wed Dec  8 07:40:31 2004
@@ -77,4 +77,7 @@
             rb.signal();
         }
     }
+
+       public void markerReleased(Session session, Object marker) {
+       }
 }

Modified: 
incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java&r2=111280
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java
    (original)
+++ 
incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java
    Wed Dec  8 07:40:31 2004
@@ -59,4 +59,7 @@
 
     public void dataWritten(Session session, int writtenBytes) {
     }
+
+       public void markerReleased(Session session, Object marker) {
+       }
 }

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java&r2=111280
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java
 (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java
 Wed Dec  8 07:40:31 2004
@@ -39,4 +39,6 @@
     void dataRead(Session session, int readBytes);
 
     void dataWritten(Session session, int writtenBytes);
+    
+    void markerReleased(Session session, Object marker);
 }

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java&r2=111280
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java
    (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java
    Wed Dec  8 07:40:31 2004
@@ -71,4 +71,6 @@
     ByteBuffer asByteBuffer();
 
     WriteBuffer flush();
+    
+    WriteBuffer putMarker(Object marker);
 }

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java&r2=111280
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java
  (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java
  Wed Dec  8 07:40:31 2004
@@ -20,12 +20,10 @@
 package org.apache.mina.core.socket;
 
 import java.io.IOException;
-
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
-
 import java.util.Iterator;
 import java.util.Set;
 
@@ -342,6 +340,26 @@
                 if (writtenBytes > 0) {
                     session.increaseWrittenBytes(writtenBytes);
                     fireDataWritten(session, writtenBytes);
+                    Queue markers = lock.getMarkers();
+                    for (;;) {
+                       TcpWriteBuffer.Marker marker = (TcpWriteBuffer.Marker) 
markers.first();
+                       if (marker == null)
+                               break;
+
+                       int bytesLeft = marker.getBytesLeft();
+                       if (bytesLeft > writtenBytes) {
+                               marker.setBytesLeft(bytesLeft - writtenBytes);
+                               break;
+                       } else if (bytesLeft == writtenBytes) {
+                               markers.pop();
+                               fireMarkerRemoved(session, marker.getValue());
+                               break;
+                       } else {
+                               markers.pop();
+                               fireMarkerRemoved(session, marker.getValue());
+                               writtenBytes -= bytesLeft;
+                       }
+                    }
                 }
             }
         } catch (IOException e) {
@@ -384,6 +402,14 @@
     private void fireDataWritten(TcpSession session, int writtenBytes) {
         try {
             session.getHandler().dataWritten(session, writtenBytes);
+        } catch (Throwable e) {
+            fireExceptionCaught(session, e);
+        }
+    }
+
+    private void fireMarkerRemoved(TcpSession session, Object marker) {
+        try {
+            session.getHandler().markerReleased(session, marker);
         } catch (Throwable e) {
             fireExceptionCaught(session, e);
         }

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java&r2=111280
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java
      (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java
      Wed Dec  8 07:40:31 2004
@@ -111,7 +111,7 @@
     void flush() {
         TcpIoProcessor.getInstance().flushSession(this);
     }
-
+    
     public boolean isConnected() {
         return ch.isConnected();
     }

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java&r2=111280
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java
  (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java
  Wed Dec  8 07:40:31 2004
@@ -24,6 +24,7 @@
 
 import org.apache.mina.core.ReadBuffer;
 import org.apache.mina.core.WriteBuffer;
+import org.apache.mina.util.Queue;
 
 
 /**
@@ -35,6 +36,7 @@
 class TcpWriteBuffer implements WriteBuffer {
     private final TcpSession session;
     private final ByteBuffer buf;
+    private final Queue markers = new Queue(16);
 
     TcpWriteBuffer(TcpSession session, ByteBuffer buf) {
         this.session = session;
@@ -122,7 +124,7 @@
         session.flush();
         return this;
     }
-
+    
     public boolean hasRemaining() {
         return buf.hasRemaining();
     }
@@ -148,5 +150,43 @@
     public WriteBuffer reset() {
         buf.reset();
         return this;
+    }
+    
+    Queue getMarkers() {
+       return markers;
+    }
+    
+    public WriteBuffer putMarker(Object marker) {
+       int bytesLeft;
+       if (markers.isEmpty()) {
+               bytesLeft = buf.position();
+       } else {
+               bytesLeft = buf.position() - ((Marker) 
markers.last()).getBytesLeft();
+       }
+       
+       markers.push(new Marker(marker, bytesLeft));
+       return this;
+    }
+    
+    static class Marker {
+       private final Object value;
+       private int bytesLeft;
+       
+       private Marker(Object value, int bytesLeft) {
+               this.value = value;
+               this.bytesLeft = bytesLeft;
+       }
+       
+       public Object getValue() {
+               return value;
+       }
+       
+       public int getBytesLeft() {
+               return bytesLeft;
+       }
+       
+       public void setBytesLeft(int bytesLeft) {
+               this.bytesLeft = bytesLeft;
+       }
     }
 }

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java&r2=111280
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java
        (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java
        Wed Dec  8 07:40:31 2004
@@ -110,6 +110,10 @@
             write(session);
         }
 
+               public void markerReleased(Session session, Object marker) {
+                       fireMessageSent((ProtocolSession) 
session.getAttachment(), marker);
+               }
+               
         private void write(Session session) {
             ProtocolSessionImpl psession =
                 (ProtocolSessionImpl) session.getAttachment();
@@ -124,9 +128,8 @@
                 while (!writeQueue.isEmpty()) {
                     synchronized (out) {
                         if (codec.encode(psession, writeQueue.first(), out)) {
+                               out.putMarker(writeQueue.pop());
                             out.flush();
-                            // FIXME The message is not actually written.
-                            fireMessageSent(psession, writeQueue.pop());
                         } else {
                             out.flush();
                             break;

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java&r2=111280
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java
  (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java
  Wed Dec  8 07:40:31 2004
@@ -118,6 +118,14 @@
 
         return items[first];
     }
+    
+    public Object last() {
+       if (size == 0) {
+               return null;
+       }
+       
+       return items[(last + items.length - 1) % items.length];
+    }
 
     /**
      * Returns <code>true</code> if the queue is empty.

Reply via email to