Author: proyal
Date: Mon Sep  3 21:28:47 2007
New Revision: 572512

URL: http://svn.apache.org/viewvc?rev=572512&view=rev
Log:
port forward r572031 from 1.1 branch. fixes for DIRMINA-429 and DIRMINA-430

Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
    
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java Mon 
Sep  3 21:28:47 2007
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.common;
 
@@ -25,17 +25,17 @@
 /**
  * A dummy [EMAIL PROTECTED] IoSession} for unit-testing or non-network-use of
  * the classes that depends on [EMAIL PROTECTED] IoSession}.
- * 
+ *
  * @author The Apache MINA Project ([EMAIL PROTECTED])
  * @version $Rev$, $Date$
  */
 public class DummySession extends AbstractIoSession {
-    
+
     private static final TransportMetadata TRANSPORT_METADATA =
-        new DefaultTransportMetadata(
-                "dummy", false, false,
-                SocketAddress.class, IoSessionConfig.class, Object.class);
-    
+            new DefaultTransportMetadata(
+                    "dummy", false, false,
+                    SocketAddress.class, IoSessionConfig.class, Object.class);
+
     private static final SocketAddress ANONYMOUS_ADDRESS = new SocketAddress() 
{
         private static final long serialVersionUID = -496112902353454179L;
 
@@ -44,7 +44,7 @@
             return "?";
         }
     };
-    
+
     private volatile IoService service;
 
     private volatile IoSessionConfig config = new AbstractIoSessionConfig() {
@@ -52,7 +52,7 @@
         protected void doSetAll(IoSessionConfig config) {
         }
     };
-    
+
     private volatile IoFilterChain filterChain = new 
AbstractIoFilterChain(this) {
         @Override
         protected void doClose(IoSession session) throws Exception {
@@ -64,7 +64,7 @@
             fireMessageSent(session, writeRequest);
         }
     };
-    
+
     private volatile IoHandler handler = new IoHandlerAdapter();
     private volatile SocketAddress localAddress = ANONYMOUS_ADDRESS;
     private volatile SocketAddress remoteAddress = ANONYMOUS_ADDRESS;
@@ -100,14 +100,14 @@
                 return TRANSPORT_METADATA;
             }
         };
-        
+
         // Set meaningless default values.
         acceptor.setHandler(new IoHandlerAdapter());
         acceptor.setLocalAddress(ANONYMOUS_ADDRESS);
 
         this.service = acceptor;
     }
-    
+
     @Override
     protected void updateTrafficMask() {
     }
@@ -115,7 +115,7 @@
     public IoSessionConfig getConfig() {
         return config;
     }
-    
+
     /**
      * Sets the configuration of this session.
      */
@@ -123,14 +123,14 @@
         if (config == null) {
             throw new NullPointerException("config");
         }
-        
+
         this.config = config;
     }
 
     public IoFilterChain getFilterChain() {
         return filterChain;
     }
-    
+
     /**
      * Sets the filter chain that affects this session.
      */
@@ -138,14 +138,14 @@
         if (filterChain == null) {
             throw new NullPointerException("filterChain");
         }
-        
+
         this.filterChain = filterChain;
     }
 
     public IoHandler getHandler() {
         return handler;
     }
-    
+
     /**
      * Sets the [EMAIL PROTECTED] IoHandler} which handles this session.
      */
@@ -153,7 +153,7 @@
         if (handler == null) {
             throw new NullPointerException("handler");
         }
-        
+
         this.handler = handler;
     }
 
@@ -164,7 +164,7 @@
     public SocketAddress getRemoteAddress() {
         return remoteAddress;
     }
-    
+
     /**
      * Sets the socket address of local machine which is associated with
      * this session.
@@ -173,22 +173,22 @@
         if (localAddress == null) {
             throw new NullPointerException("localAddress");
         }
-        
+
         this.localAddress = localAddress;
     }
-    
+
     /**
-     * Sets the socket address of remote peer. 
+     * Sets the socket address of remote peer.
      */
     public void setRemoteAddress(SocketAddress remoteAddress) {
         if (remoteAddress == null) {
             throw new NullPointerException("remoteAddress");
         }
-        
+
         this.remoteAddress = remoteAddress;
     }
 
-    public int getScheduledWriteBytes() {
+    public long getScheduledWriteBytes() {
         return 0;
     }
 
@@ -199,7 +199,7 @@
     public IoService getService() {
         return service;
     }
-    
+
     /**
      * Sets the [EMAIL PROTECTED] IoService} which provides I/O service to 
this session.
      */
@@ -207,14 +207,14 @@
         if (service == null) {
             throw new NullPointerException("service");
         }
-        
+
         this.service = service;
     }
 
     public TransportMetadata getTransportMetadata() {
         return transportMetadata;
     }
-    
+
     /**
      * Sets the [EMAIL PROTECTED] TransportMetadata} that this session runs on.
      */
@@ -222,7 +222,7 @@
         if (transportMetadata == null) {
             throw new NullPointerException("transportMetadata");
         }
-        
+
         this.transportMetadata = transportMetadata;
     }
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java Mon Sep 
 3 21:28:47 2007
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.common;
 
@@ -23,32 +23,32 @@
 import java.util.Set;
 
 /**
- * A handle which represents connection between two end-points regardless of 
+ * A handle which represents connection between two end-points regardless of
  * transport types.
- * <p>
+ * <p/>
  * [EMAIL PROTECTED] IoSession} provides user-defined attributes.  
User-defined attributes
  * are application-specific data which is associated with a session.
  * It often contains objects that represents the state of a higher-level 
protocol
  * and becomes a way to exchange data between filters and handlers.
- * 
+ * <p/>
  * <h3>Adjusting Transport Type Specific Properties</h3>
- * <p>
+ * <p/>
  * You can simply downcast the session to an appropriate subclass.
  * </p>
- * 
+ * <p/>
  * <h3>Thread Safety</h3>
- * <p>
+ * <p/>
  * [EMAIL PROTECTED] IoSession} is thread-safe.  But please note that 
performing
  * more than one [EMAIL PROTECTED] #write(Object)} calls at the same time will
- * cause the [EMAIL PROTECTED] IoFilter#filterWrite(IoFilter.NextFilter, 
IoSession, WriteRequest)}
+ * cause the [EMAIL PROTECTED] 
IoFilter#filterWrite(IoFilter.NextFilter,IoSession,WriteRequest)}
  * is executed simultaneously, and therefore you have to make sure the
- * [EMAIL PROTECTED] IoFilter} implementations you're using are thread-safe, 
too. 
+ * [EMAIL PROTECTED] IoFilter} implementations you're using are thread-safe, 
too.
  * </p>
- * 
+ * <p/>
  * <h3>Equality of Sessions</h3>
  * [EMAIL PROTECTED] #equals(Object)} and [EMAIL PROTECTED] #hashCode()} shall 
not be overriden
  * to the default behavior that is defined in [EMAIL PROTECTED] Object}.
- *   
+ *
  * @author The Apache MINA Project ([EMAIL PROTECTED])
  * @version $Rev$, $Date$
  */
@@ -73,7 +73,7 @@
      * Returns the filter chain that only affects this session.
      */
     IoFilterChain getFilterChain();
-    
+
     /**
      * Returns the [EMAIL PROTECTED] TransportMetadata} that this session runs 
on.
      */
@@ -81,7 +81,7 @@
 
     /**
      * Writes the specified <code>message</code> to remote peer.  This
-     * operation is asynchronous; [EMAIL PROTECTED] 
IoHandler#messageSent(IoSession, Object)}
+     * operation is asynchronous; [EMAIL PROTECTED] 
IoHandler#messageSent(IoSession,Object)}
      * will be invoked when the message is actually sent to remote peer.
      * You can also wait for the returned [EMAIL PROTECTED] WriteFuture} if 
you want
      * to wait for the message actually written.
@@ -104,14 +104,14 @@
     /**
      * Sets an attachment of this session.
      * This method is identical with <tt>setAttribute( "", attachment )</tt>.
-     * 
+     *
      * @return Old attachment.  <tt>null</tt> if it is new.
      */
     Object setAttachment(Object attachment);
 
     /**
      * Returns the value of user-defined attribute of this session.
-     * 
+     *
      * @param key the key of the attribute
      * @return <tt>null</tt> if there is no attribute with the specified key
      */
@@ -136,8 +136,8 @@
 
     /**
      * Sets a user-defined attribute.
-     * 
-     * @param key the key of the attribute
+     *
+     * @param key   the key of the attribute
      * @param value the value of the attribute
      * @return The old value of the attribute.  <tt>null</tt> if it is new.
      */
@@ -147,7 +147,7 @@
      * Sets a user defined attribute without a value.  This is useful when
      * you just want to put a 'mark' attribute.  Its value is set to
      * [EMAIL PROTECTED] Boolean#TRUE}.
-     * 
+     *
      * @param key the key of the attribute
      * @return The old value of the attribute.  <tt>null</tt> if it is new.
      */
@@ -169,7 +169,7 @@
 
     /**
      * Removes a user-defined attribute with the specified key.
-     * 
+     *
      * @return The old value of the attribute.  <tt>null</tt> if not found.
      */
     Object removeAttribute(String key);
@@ -235,7 +235,7 @@
     CloseFuture getCloseFuture();
 
     /**
-     * Returns the socket address of remote peer. 
+     * Returns the socket address of remote peer.
      */
     SocketAddress getRemoteAddress();
 
@@ -251,7 +251,7 @@
      * returns the [EMAIL PROTECTED] SocketAddress} which is specified as a 
parameter of
      * [EMAIL PROTECTED] IoAcceptor#bind()}.  If this session is managed by
      * [EMAIL PROTECTED] IoConnector}, this method returns the same address 
with
-     * that of [EMAIL PROTECTED] #getRemoteAddress()}.  
+     * that of [EMAIL PROTECTED] #getRemoteAddress()}.
      */
     SocketAddress getServiceAddress();
 
@@ -302,7 +302,7 @@
     long getWrittenBytes();
 
     /**
-     * Returns the total number of messages which were read and decoded from 
this session. 
+     * Returns the total number of messages which were read and decoded from 
this session.
      */
     long getReadMessages();
 
@@ -320,7 +320,7 @@
      * Returns the number of bytes which are scheduled to be written to this
      * session.
      */
-    int getScheduledWriteBytes();
+    long getScheduledWriteBytes();
 
     /**
      * Returns the time in millis when this session is created.
@@ -343,7 +343,7 @@
     long getLastWriteTime();
 
     /**
-     * Returns <code>true</code> if this session is idle for the specified 
+     * Returns <code>true</code> if this session is idle for the specified
      * [EMAIL PROTECTED] IdleStatus}.
      */
     boolean isIdle(IdleStatus status);
@@ -351,7 +351,7 @@
     /**
      * Returns the number of the fired continuous <tt>sessionIdle</tt> events
      * for the specified [EMAIL PROTECTED] IdleStatus}.
-     * <p>
+     * <p/>
      * If <tt>sessionIdle</tt> event is fired first after some time after I/O,
      * <tt>idleCount</tt> becomes <tt>1</tt>.  <tt>idleCount</tt> resets to
      * <tt>0</tt> if any I/O occurs again, otherwise it increases to

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
 Mon Sep  3 21:28:47 2007
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.socket.nio;
 
@@ -34,26 +34,26 @@
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
-import org.apache.mina.common.TransportMetadata;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.TransportMetadata;
 import org.apache.mina.common.WriteFuture;
 import org.apache.mina.common.WriteRequest;
 
 /**
  * An [EMAIL PROTECTED] IoSession} for datagram transport (UDP/IP).
- * 
+ *
  * @author The Apache MINA Project ([EMAIL PROTECTED])
  * @version $Rev$, $Date$
  */
 class DatagramSessionImpl extends AbstractIoSession implements DatagramSession 
{
-    
+
     static final TransportMetadata METADATA =
-        new DefaultTransportMetadata(
-                "datagram", true, false,
-                InetSocketAddress.class,
-                DatagramSessionConfig.class, ByteBuffer.class);
-    
+            new DefaultTransportMetadata(
+                    "datagram", true, false,
+                    InetSocketAddress.class,
+                    DatagramSessionConfig.class, ByteBuffer.class);
+
     private final IoService service;
 
     private final DatagramSessionConfig config = new SessionConfigImpl();
@@ -99,7 +99,7 @@
      * Creates a new connector instance.
      */
     DatagramSessionImpl(DatagramConnector service,
-            DatagramChannel ch, IoHandler defaultHandler) {
+                        DatagramChannel ch, IoHandler defaultHandler) {
         this.service = service;
         this.ch = ch;
         this.handler = defaultHandler;
@@ -138,7 +138,7 @@
     public IoHandler getHandler() {
         return handler;
     }
-    
+
     public TransportMetadata getTransportMetadata() {
         return METADATA;
     }
@@ -177,10 +177,10 @@
                 Object message = request.getMessage();
                 if (message instanceof ByteBuffer) {
                     if (((ByteBuffer) message).hasRemaining()) {
-                        size ++;
+                        size++;
                     }
                 } else {
-                    size ++;
+                    size++;
                 }
             }
         }
@@ -188,7 +188,7 @@
         return size;
     }
 
-    public int getScheduledWriteBytes() {
+    public long getScheduledWriteBytes() {
         int size = 0;
         synchronized (writeRequestQueue) {
             for (Object o : writeRequestQueue) {

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
 Mon Sep  3 21:28:47 2007
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.socket.nio;
 
@@ -28,7 +28,7 @@
 
 /**
  * An [EMAIL PROTECTED] IoFilterChain} for socket transport (TCP/IP).
- * 
+ *
  * @author The Apache MINA Project ([EMAIL PROTECTED])
  */
 class SocketFilterChain extends AbstractIoFilterChain {

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
 Mon Sep  3 21:28:47 2007
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.socket.nio;
 
@@ -23,7 +23,6 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
-import java.util.Iterator;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -31,12 +30,12 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.FileRegion;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoServiceListenerSupport;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.RuntimeIOException;
-import org.apache.mina.common.FileRegion;
 import org.apache.mina.common.WriteRequest;
 import org.apache.mina.common.WriteTimeoutException;
 import org.apache.mina.util.NamePreservingRunnable;
@@ -111,8 +110,7 @@
 
     void flush(SocketSessionImpl session) {
         boolean needsWakeup = flushingSessions.isEmpty();
-        scheduleFlush(session);
-        if (needsWakeup) {
+        if (scheduleFlush(session) && needsWakeup) {
             selector.wakeup();
         }
     }
@@ -126,8 +124,14 @@
         removingSessions.offer(session);
     }
 
-    private void scheduleFlush(SocketSessionImpl session) {
-        flushingSessions.offer(session);
+    private boolean scheduleFlush(SocketSessionImpl session) {
+        if (session.getInFlushQueue().compareAndSet(false, true)) {
+            flushingSessions.add(session);
+
+            return true;
+        }
+
+        return false;
     }
 
     private void scheduleTrafficControl(SocketSessionImpl session) {
@@ -135,7 +139,7 @@
     }
 
     private void doAddNew() {
-        for (;;) {
+        for (; ;) {
             SocketSessionImpl session = newSessions.poll();
 
             if (session == null) {
@@ -169,7 +173,7 @@
     }
 
     private void doRemove() {
-        for (;;) {
+        for (; ;) {
             SocketSessionImpl session = removingSessions.poll();
 
             if (session == null) {
@@ -202,10 +206,7 @@
     }
 
     private void process(Set<SelectionKey> selectedKeys) {
-        Iterator<SelectionKey> it = selectedKeys.iterator();
-
-        while (it.hasNext()) {
-            SelectionKey key = it.next();
+        for (SelectionKey key : selectedKeys) {
             SocketSessionImpl session = (SocketSessionImpl) key.attachment();
 
             if (key.isReadable() && session.getTrafficMask().isReadable()) {
@@ -241,7 +242,7 @@
             if (readBytes > 0) {
                 session.getFilterChain().fireMessageReceived(session, buf);
                 buf = null;
-                
+
                 if (readBytes * 2 < session.getReadBufferSize()) {
                     if (session.getReadBufferSize() > 64) {
                         session.setReadBufferSize(session.getReadBufferSize() 
>>> 1);
@@ -259,10 +260,10 @@
             if (ret < 0) {
                 scheduleRemove(session);
             }
+        } catch (IOException e) {
+            scheduleRemove(session);
+            session.getFilterChain().fireExceptionCaught(session, e);
         } catch (Throwable e) {
-            if (e instanceof IOException) {
-                scheduleRemove(session);
-            }
             session.getFilterChain().fireExceptionCaught(session, e);
         }
     }
@@ -287,22 +288,22 @@
         notifyIdleness0(session, currentTime, session
                 .getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
                 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
-                        .getLastIdleTime(IdleStatus.BOTH_IDLE)));
+                .getLastIdleTime(IdleStatus.BOTH_IDLE)));
         notifyIdleness0(session, currentTime, session
                 .getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
                 IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
-                        session.getLastIdleTime(IdleStatus.READER_IDLE)));
+                session.getLastIdleTime(IdleStatus.READER_IDLE)));
         notifyIdleness0(session, currentTime, session
                 .getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
                 IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
-                        session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
+                session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
 
         notifyWriteTimeout(session, currentTime, session
                 .getConfig().getWriteTimeoutInMillis(), 
session.getLastWriteTime());
     }
 
     private void notifyIdleness0(SocketSessionImpl session, long currentTime,
-            long idleTime, IdleStatus status, long lastIoTime) {
+                                 long idleTime, IdleStatus status, long 
lastIoTime) {
         if (idleTime > 0 && lastIoTime != 0
                 && (currentTime - lastIoTime) >= idleTime) {
             session.increaseIdleCount(status);
@@ -311,7 +312,7 @@
     }
 
     private void notifyWriteTimeout(SocketSessionImpl session,
-            long currentTime, long writeTimeout, long lastIoTime) {
+                                    long currentTime, long writeTimeout, long 
lastIoTime) {
         SelectionKey key = session.getSelectionKey();
         if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
                 && key != null && key.isValid()
@@ -326,13 +327,15 @@
             return;
         }
 
-        for (;;) {
+        for (; ;) {
             SocketSessionImpl session = flushingSessions.poll();
 
             if (session == null) {
                 break;
             }
 
+            session.getInFlushQueue().set(false);
+
             if (!session.isConnected()) {
                 clearWriteRequestQueue(session);
                 continue;
@@ -352,7 +355,10 @@
             }
 
             try {
-                doFlush(session);
+                boolean flushedAll = doFlush(session);
+                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && 
!session.getInFlushQueue().get()) {
+                    scheduleFlush(session);
+                }
             } catch (IOException e) {
                 scheduleRemove(session);
                 session.getFilterChain().fireExceptionCaught(session, e);
@@ -364,16 +370,19 @@
         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
         WriteRequest req;
 
-        if ((req = writeRequestQueue.poll()) != null) {
+        while ((req = writeRequestQueue.poll()) != null) {
             Object m = req.getMessage();
             if (m instanceof ByteBuffer) {
                 ByteBuffer buf = (ByteBuffer) req.getMessage();
+
+                
session.getScheduledWriteBytesCounter().addAndGet(-buf.remaining());
+
                 // The first unwritten empty buffer must be
                 // forwarded to the filter chain.
                 if (buf.hasRemaining()) {
                     req.getFuture().setWritten(false);
                 } else {
-                    session.getFilterChain().fireMessageSent(session, req);    
                
+                    session.getFilterChain().fireMessageSent(session, req);
                 }
             } else {
                 req.getFuture().setWritten(false);
@@ -386,7 +395,7 @@
         }
     }
 
-    private void doFlush(SocketSessionImpl session) throws IOException {
+    private boolean doFlush(SocketSessionImpl session) throws IOException {
         // Clear OP_WRITE
         SelectionKey key = session.getSelectionKey();
         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -398,80 +407,72 @@
         int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
         try {
             do {
-                WriteRequest req;
-    
                 // Check for pending writes.
-                synchronized (writeRequestQueue) {
-                    req = writeRequestQueue.peek();
-                }
-    
+                WriteRequest req = writeRequestQueue.peek();
+
                 if (req == null) {
                     break;
                 }
-    
+
                 Object message = req.getMessage();
                 if (message instanceof FileRegion) {
-                    FileRegion region = (FileRegion) message; 
-    
+                    FileRegion region = (FileRegion) message;
+
                     if (region.getCount() <= 0) {
                         // File has been sent, remove from queue
-                        synchronized (writeRequestQueue) {
-                            writeRequestQueue.poll();
-                        }
+                        writeRequestQueue.poll();
                         session.increaseWrittenMessages();
                         session.getFilterChain().fireMessageSent(session, req);
                         continue;
                     }
-                    
+
                     if (key.isWritable()) {
-                        long localWrittenBytes = 
-                            
region.getFileChannel().transferTo(region.getPosition(), region.getCount(), ch);
+                        long localWrittenBytes =
+                                
region.getFileChannel().transferTo(region.getPosition(), region.getCount(), ch);
                         region.setPosition(region.getPosition() + 
localWrittenBytes);
                         writtenBytes += localWrittenBytes;
                     }
-                    
+
                     if (region.getCount() > 0 || writtenBytes >= 
maxWrittenBytes) {
                         // Kernel buffer is full or wrote too much.
                         key.interestOps(key.interestOps() | 
SelectionKey.OP_WRITE);
-                        break;
+                        return false;
                     }
-    
+
                 } else {
                     ByteBuffer buf = (ByteBuffer) message;
                     if (buf.remaining() == 0) {
                         // Buffer has been completely sent, remove request 
form queue
-                        synchronized (writeRequestQueue) {
-                            writeRequestQueue.poll();
-                        }
-    
+                        writeRequestQueue.poll();
+
                         session.increaseWrittenMessages();
-    
+
                         buf.reset();
                         session.getFilterChain().fireMessageSent(session, req);
                         continue;
                     }
-    
+
                     if (key.isWritable()) {
                         writtenBytes += ch.write(buf.buf());
                     }
-    
+
                     if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) 
{
                         // Kernel buffer is full or wrote too much.
                         key.interestOps(key.interestOps() | 
SelectionKey.OP_WRITE);
-                        break;
+                        return false;
                     }
                 }
             } while (writtenBytes < maxWrittenBytes);
         } finally {
             session.increaseWrittenBytes(writtenBytes);
         }
+
+        return true;
     }
 
     private void doUpdateTrafficMask() {
-        for (;;) {
-            SocketSessionImpl session;
-
-            session = trafficControllingSessions.poll();
+        for (; ;) {
+            SocketSessionImpl session = trafficControllingSessions.poll();
 
             if (session == null) {
                 break;
@@ -479,7 +480,7 @@
 
             SelectionKey key = session.getSelectionKey();
             // Retry later if session is not yet fully initialized.
-            // (In case that Session.suspend??() or session.resume??() is 
+            // (In case that Session.suspend??() or session.resume??() is
             // called before addSession() is processed)
             if (key == null) {
                 scheduleTrafficControl(session);
@@ -493,12 +494,8 @@
             // The normal is OP_READ and, if there are write requests in the
             // session's write queue, set OP_WRITE to trigger flushing.
             int ops = SelectionKey.OP_READ;
-            Queue<WriteRequest> writeRequestQueue = session
-                    .getWriteRequestQueue();
-            synchronized (writeRequestQueue) {
-                if (!writeRequestQueue.isEmpty()) {
-                    ops |= SelectionKey.OP_WRITE;
-                }
+            if (!session.getWriteRequestQueue().isEmpty()) {
+                ops |= SelectionKey.OP_WRITE;
             }
 
             // Now mask the preferred ops with the mask of the current session
@@ -511,7 +508,7 @@
         public void run() {
             Thread.currentThread().setName(SocketIoProcessor.this.threadName);
 
-            for (;;) {
+            for (; ;) {
                 try {
                     int nKeys = selector.select(1000);
                     doAddNew();
@@ -527,8 +524,7 @@
 
                     if (selector.keys().isEmpty()) {
                         synchronized (lock) {
-                            if (selector.keys().isEmpty()
-                                    && newSessions.isEmpty()) {
+                            if (selector.keys().isEmpty() && 
newSessions.isEmpty()) {
                                 worker = null;
                                 break;
                             }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
 Mon Sep  3 21:28:47 2007
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.socket.nio;
 
@@ -23,8 +23,10 @@
 import java.net.SocketException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
-import java.util.LinkedList;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.mina.common.AbstractIoSession;
 import org.apache.mina.common.ByteBuffer;
@@ -33,9 +35,9 @@
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
-import org.apache.mina.common.TransportMetadata;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.TransportMetadata;
 import org.apache.mina.common.WriteRequest;
 
 /**
@@ -45,13 +47,13 @@
  * @version $Rev$, $Date$
  */
 class SocketSessionImpl extends AbstractIoSession implements SocketSession {
-    
-    static final TransportMetadata METADATA = 
-        new DefaultTransportMetadata(
-                "socket", false, true,
-                InetSocketAddress.class,
-                SocketSessionConfig.class,
-                ByteBuffer.class, FileRegion.class);
+
+    static final TransportMetadata METADATA =
+            new DefaultTransportMetadata(
+                    "socket", false, true,
+                    InetSocketAddress.class,
+                    SocketSessionConfig.class,
+                    ByteBuffer.class, FileRegion.class);
 
     private final IoService service;
 
@@ -67,20 +69,20 @@
 
     private final IoHandler handler;
 
+    private final AtomicBoolean inFlushQueue = new AtomicBoolean(false);
+
+    private final AtomicLong scheduledWriteBytes = new AtomicLong();
+
     private SelectionKey key;
 
     private int readBufferSize = 1024;
 
-    /**
-     * Creates a new instance.
-     */
-    SocketSessionImpl(IoService service, SocketIoProcessor ioProcessor,
-            SocketChannel ch) {
+    SocketSessionImpl(IoService service, SocketIoProcessor ioProcessor, 
SocketChannel ch) {
         this.service = service;
         this.ioProcessor = ioProcessor;
         this.filterChain = new SocketFilterChain(this);
         this.ch = ch;
-        this.writeRequestQueue = new LinkedList<WriteRequest>();
+        this.writeRequestQueue = new ConcurrentLinkedQueue<WriteRequest>();
         this.handler = service.getHandler();
         this.config.setAll(service.getSessionConfig());
     }
@@ -100,7 +102,7 @@
     public IoFilterChain getFilterChain() {
         return filterChain;
     }
-    
+
     public TransportMetadata getTransportMetadata() {
         return METADATA;
     }
@@ -132,33 +134,34 @@
 
     public int getScheduledWriteMessages() {
         int size = 0;
-        synchronized (writeRequestQueue) {
-            for (WriteRequest request : writeRequestQueue) {
-                Object message = request.getMessage();
-                if (message instanceof ByteBuffer) {
-                    if (((ByteBuffer) message).hasRemaining()) {
-                        size ++;
-                    }
-                } else {
-                    size ++;
+
+        for (WriteRequest request : writeRequestQueue) {
+            Object message = request.getMessage();
+            if (message instanceof ByteBuffer) {
+                if (((ByteBuffer) message).hasRemaining()) {
+                    size++;
                 }
+            } else {
+                size++;
             }
         }
 
         return size;
     }
 
-    public int getScheduledWriteBytes() {
-        int size = 0;
-        synchronized (writeRequestQueue) {
-            for (Object o : writeRequestQueue) {
-                if (o instanceof ByteBuffer) {
-                    size += ((ByteBuffer) o).remaining();
-                }
-            }
-        }
+    public long getScheduledWriteBytes() {
+        return scheduledWriteBytes.get();
+    }
 
-        return size;
+    @Override
+    public void increaseWrittenBytes(long increment) {
+        super.increaseWrittenBytes(increment);
+
+        scheduledWriteBytes.addAndGet(-increment);
+    }
+
+    AtomicLong getScheduledWriteBytesCounter() {
+        return scheduledWriteBytes;
     }
 
     @Override
@@ -187,12 +190,16 @@
     int getReadBufferSize() {
         return readBufferSize;
     }
-    
+
     void setReadBufferSize(int readBufferSize) {
         this.readBufferSize = readBufferSize;
     }
 
-    private class SessionConfigImpl extends AbstractSocketSessionConfig {
+    AtomicBoolean getInFlushQueue() {
+        return inFlushQueue;
+    }
+
+    private class SessionConfigImpl extends AbstractSocketSessionConfig 
implements SocketSessionConfig {
         public boolean isKeepAlive() {
             try {
                 return ch.socket().getKeepAlive();
@@ -338,22 +345,19 @@
             }
         }
     }
-    
+
     void queueWriteRequest(WriteRequest writeRequest) {
         if (writeRequest.getMessage() instanceof ByteBuffer) {
+            ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
             // SocketIoProcessor.doFlush() will reset it after write is 
finished
-            // because the buffer will be passed with messageSent event. 
-            ((ByteBuffer) writeRequest.getMessage()).mark();
+            // because the buffer will be passed with messageSent event.
+            buffer.mark();
+            scheduledWriteBytes.addAndGet(buffer.remaining());
         }
 
-        int writeRequestQueueSize;
-        synchronized (writeRequestQueue) {
-            writeRequestQueue.offer(writeRequest);
-            writeRequestQueueSize = writeRequestQueue.size();
-        }
+        writeRequestQueue.add(writeRequest);
 
-        if (writeRequestQueueSize == 1 && getTrafficMask().isWritable()) {
-            // Notify SocketIoProcessor only when writeRequestQueue was empty.
+        if (getTrafficMask().isWritable()) {
             getIoProcessor().flush(this);
         }
     }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
 Mon Sep  3 21:28:47 2007
@@ -30,8 +30,8 @@
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoServiceListenerSupport;
-import org.apache.mina.common.TransportMetadata;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.TransportMetadata;
 import org.apache.mina.common.WriteRequest;
 
 /**
@@ -41,14 +41,14 @@
  * @version $Rev$, $Date$
  */
 class VmPipeSessionImpl extends AbstractIoSession implements VmPipeSession {
-    
-    static final TransportMetadata METADATA = 
-        new DefaultTransportMetadata(
-            "vmpipe", false, false,
-            VmPipeAddress.class,
-            VmPipeSessionConfig.class,
-            Object.class);
-    
+
+    static final TransportMetadata METADATA =
+            new DefaultTransportMetadata(
+                    "vmpipe", false, false,
+                    VmPipeAddress.class,
+                    VmPipeSessionConfig.class,
+                    Object.class);
+
     private static final VmPipeSessionConfig CONFIG = new 
DefaultVmPipeSessionConfig();
 
     private final IoService service;
@@ -71,12 +71,12 @@
 
     final BlockingQueue<Object> pendingDataQueue;
 
-    /**
+    /*
      * Constructor for client-side session.
      */
     VmPipeSessionImpl(IoService service,
-            IoServiceListenerSupport serviceListeners,
-            VmPipeAddress localAddress, IoHandler handler, VmPipe remoteEntry) 
{
+                      IoServiceListenerSupport serviceListeners,
+                      VmPipeAddress localAddress, IoHandler handler, VmPipe 
remoteEntry) {
         this.service = service;
         this.serviceListeners = serviceListeners;
         this.lock = new Object();
@@ -89,7 +89,7 @@
         remoteSession = new VmPipeSessionImpl(this, remoteEntry);
     }
 
-    /**
+    /*
      * Constructor for server-side session.
      */
     private VmPipeSessionImpl(VmPipeSessionImpl remoteSession, VmPipe entry) {
@@ -127,7 +127,7 @@
     public IoHandler getHandler() {
         return handler;
     }
-    
+
     public TransportMetadata getTransportMetadata() {
         return METADATA;
     }
@@ -146,7 +146,7 @@
         return 0;
     }
 
-    public int getScheduledWriteBytes() {
+    public long getScheduledWriteBytes() {
         return 0;
     }
 

Modified: 
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
--- 
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
 (original)
+++ 
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
 Mon Sep  3 21:28:47 2007
@@ -6,23 +6,19 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.serial;
 
-import gnu.io.SerialPort;
-import gnu.io.SerialPortEvent;
-import gnu.io.SerialPortEventListener;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -31,6 +27,9 @@
 import java.util.Queue;
 import java.util.TooManyListenersException;
 
+import gnu.io.SerialPort;
+import gnu.io.SerialPortEvent;
+import gnu.io.SerialPortEventListener;
 import org.apache.mina.common.AbstractIoSession;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.DefaultTransportMetadata;
@@ -39,9 +38,9 @@
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
-import org.apache.mina.common.TransportMetadata;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.TransportMetadata;
 import org.apache.mina.common.WriteRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,9 +75,9 @@
     private final Logger log;
 
     static final TransportMetadata METADATA =
-        new DefaultTransportMetadata(
-            "serial", false, true, SerialAddress.class,
-            SerialSessionConfig.class, ByteBuffer.class);
+            new DefaultTransportMetadata(
+                    "serial", false, true, SerialAddress.class,
+                    SerialSessionConfig.class, ByteBuffer.class);
 
     SerialSession(IoService service, SerialAddress address, SerialPort port) {
         this.service = service;
@@ -107,7 +106,7 @@
     public IoHandler getHandler() {
         return ioHandler;
     }
-    
+
     public TransportMetadata getTransportMetadata() {
         return METADATA;
     }
@@ -131,10 +130,10 @@
                 Object message = request.getMessage();
                 if (message instanceof ByteBuffer) {
                     if (((ByteBuffer) message).hasRemaining()) {
-                        size ++;
+                        size++;
                     }
                 } else {
-                    size ++;
+                    size++;
                 }
             }
         }
@@ -142,7 +141,7 @@
         return size;
     }
 
-    public int getScheduledWriteBytes() {
+    public long getScheduledWriteBytes() {
         int size = 0;
         synchronized (writeRequestQueue) {
             for (Object o : writeRequestQueue) {
@@ -168,7 +167,7 @@
 
     /**
      * start handling streams
-     * 
+     *
      * @throws IOException
      * @throws TooManyListenersException
      */
@@ -205,7 +204,7 @@
     }
 
     private void flushWrites() {
-        for (;;) {
+        for (; ;) {
             WriteRequest req;
 
             synchronized (writeRequestQueue) {


Reply via email to