Author: proyal
Date: Mon Sep 3 21:28:47 2007
New Revision: 572512
URL: http://svn.apache.org/viewvc?rev=572512&view=rev
Log:
port forward r572031 from 1.1 branch. fixes for DIRMINA-429 and DIRMINA-430
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java Mon
Sep 3 21:28:47 2007
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.common;
@@ -25,17 +25,17 @@
/**
* A dummy [EMAIL PROTECTED] IoSession} for unit-testing or non-network-use of
* the classes that depends on [EMAIL PROTECTED] IoSession}.
- *
+ *
* @author The Apache MINA Project ([EMAIL PROTECTED])
* @version $Rev$, $Date$
*/
public class DummySession extends AbstractIoSession {
-
+
private static final TransportMetadata TRANSPORT_METADATA =
- new DefaultTransportMetadata(
- "dummy", false, false,
- SocketAddress.class, IoSessionConfig.class, Object.class);
-
+ new DefaultTransportMetadata(
+ "dummy", false, false,
+ SocketAddress.class, IoSessionConfig.class, Object.class);
+
private static final SocketAddress ANONYMOUS_ADDRESS = new SocketAddress()
{
private static final long serialVersionUID = -496112902353454179L;
@@ -44,7 +44,7 @@
return "?";
}
};
-
+
private volatile IoService service;
private volatile IoSessionConfig config = new AbstractIoSessionConfig() {
@@ -52,7 +52,7 @@
protected void doSetAll(IoSessionConfig config) {
}
};
-
+
private volatile IoFilterChain filterChain = new
AbstractIoFilterChain(this) {
@Override
protected void doClose(IoSession session) throws Exception {
@@ -64,7 +64,7 @@
fireMessageSent(session, writeRequest);
}
};
-
+
private volatile IoHandler handler = new IoHandlerAdapter();
private volatile SocketAddress localAddress = ANONYMOUS_ADDRESS;
private volatile SocketAddress remoteAddress = ANONYMOUS_ADDRESS;
@@ -100,14 +100,14 @@
return TRANSPORT_METADATA;
}
};
-
+
// Set meaningless default values.
acceptor.setHandler(new IoHandlerAdapter());
acceptor.setLocalAddress(ANONYMOUS_ADDRESS);
this.service = acceptor;
}
-
+
@Override
protected void updateTrafficMask() {
}
@@ -115,7 +115,7 @@
public IoSessionConfig getConfig() {
return config;
}
-
+
/**
* Sets the configuration of this session.
*/
@@ -123,14 +123,14 @@
if (config == null) {
throw new NullPointerException("config");
}
-
+
this.config = config;
}
public IoFilterChain getFilterChain() {
return filterChain;
}
-
+
/**
* Sets the filter chain that affects this session.
*/
@@ -138,14 +138,14 @@
if (filterChain == null) {
throw new NullPointerException("filterChain");
}
-
+
this.filterChain = filterChain;
}
public IoHandler getHandler() {
return handler;
}
-
+
/**
* Sets the [EMAIL PROTECTED] IoHandler} which handles this session.
*/
@@ -153,7 +153,7 @@
if (handler == null) {
throw new NullPointerException("handler");
}
-
+
this.handler = handler;
}
@@ -164,7 +164,7 @@
public SocketAddress getRemoteAddress() {
return remoteAddress;
}
-
+
/**
* Sets the socket address of local machine which is associated with
* this session.
@@ -173,22 +173,22 @@
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
-
+
this.localAddress = localAddress;
}
-
+
/**
- * Sets the socket address of remote peer.
+ * Sets the socket address of remote peer.
*/
public void setRemoteAddress(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
-
+
this.remoteAddress = remoteAddress;
}
- public int getScheduledWriteBytes() {
+ public long getScheduledWriteBytes() {
return 0;
}
@@ -199,7 +199,7 @@
public IoService getService() {
return service;
}
-
+
/**
* Sets the [EMAIL PROTECTED] IoService} which provides I/O service to
this session.
*/
@@ -207,14 +207,14 @@
if (service == null) {
throw new NullPointerException("service");
}
-
+
this.service = service;
}
public TransportMetadata getTransportMetadata() {
return transportMetadata;
}
-
+
/**
* Sets the [EMAIL PROTECTED] TransportMetadata} that this session runs on.
*/
@@ -222,7 +222,7 @@
if (transportMetadata == null) {
throw new NullPointerException("transportMetadata");
}
-
+
this.transportMetadata = transportMetadata;
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java Mon Sep
3 21:28:47 2007
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.common;
@@ -23,32 +23,32 @@
import java.util.Set;
/**
- * A handle which represents connection between two end-points regardless of
+ * A handle which represents connection between two end-points regardless of
* transport types.
- * <p>
+ * <p/>
* [EMAIL PROTECTED] IoSession} provides user-defined attributes.
User-defined attributes
* are application-specific data which is associated with a session.
* It often contains objects that represents the state of a higher-level
protocol
* and becomes a way to exchange data between filters and handlers.
- *
+ * <p/>
* <h3>Adjusting Transport Type Specific Properties</h3>
- * <p>
+ * <p/>
* You can simply downcast the session to an appropriate subclass.
* </p>
- *
+ * <p/>
* <h3>Thread Safety</h3>
- * <p>
+ * <p/>
* [EMAIL PROTECTED] IoSession} is thread-safe. But please note that
performing
* more than one [EMAIL PROTECTED] #write(Object)} calls at the same time will
- * cause the [EMAIL PROTECTED] IoFilter#filterWrite(IoFilter.NextFilter,
IoSession, WriteRequest)}
+ * cause the [EMAIL PROTECTED]
IoFilter#filterWrite(IoFilter.NextFilter,IoSession,WriteRequest)}
* is executed simultaneously, and therefore you have to make sure the
- * [EMAIL PROTECTED] IoFilter} implementations you're using are thread-safe,
too.
+ * [EMAIL PROTECTED] IoFilter} implementations you're using are thread-safe,
too.
* </p>
- *
+ * <p/>
* <h3>Equality of Sessions</h3>
* [EMAIL PROTECTED] #equals(Object)} and [EMAIL PROTECTED] #hashCode()} shall
not be overriden
* to the default behavior that is defined in [EMAIL PROTECTED] Object}.
- *
+ *
* @author The Apache MINA Project ([EMAIL PROTECTED])
* @version $Rev$, $Date$
*/
@@ -73,7 +73,7 @@
* Returns the filter chain that only affects this session.
*/
IoFilterChain getFilterChain();
-
+
/**
* Returns the [EMAIL PROTECTED] TransportMetadata} that this session runs
on.
*/
@@ -81,7 +81,7 @@
/**
* Writes the specified <code>message</code> to remote peer. This
- * operation is asynchronous; [EMAIL PROTECTED]
IoHandler#messageSent(IoSession, Object)}
+ * operation is asynchronous; [EMAIL PROTECTED]
IoHandler#messageSent(IoSession,Object)}
* will be invoked when the message is actually sent to remote peer.
* You can also wait for the returned [EMAIL PROTECTED] WriteFuture} if
you want
* to wait for the message actually written.
@@ -104,14 +104,14 @@
/**
* Sets an attachment of this session.
* This method is identical with <tt>setAttribute( "", attachment )</tt>.
- *
+ *
* @return Old attachment. <tt>null</tt> if it is new.
*/
Object setAttachment(Object attachment);
/**
* Returns the value of user-defined attribute of this session.
- *
+ *
* @param key the key of the attribute
* @return <tt>null</tt> if there is no attribute with the specified key
*/
@@ -136,8 +136,8 @@
/**
* Sets a user-defined attribute.
- *
- * @param key the key of the attribute
+ *
+ * @param key the key of the attribute
* @param value the value of the attribute
* @return The old value of the attribute. <tt>null</tt> if it is new.
*/
@@ -147,7 +147,7 @@
* Sets a user defined attribute without a value. This is useful when
* you just want to put a 'mark' attribute. Its value is set to
* [EMAIL PROTECTED] Boolean#TRUE}.
- *
+ *
* @param key the key of the attribute
* @return The old value of the attribute. <tt>null</tt> if it is new.
*/
@@ -169,7 +169,7 @@
/**
* Removes a user-defined attribute with the specified key.
- *
+ *
* @return The old value of the attribute. <tt>null</tt> if not found.
*/
Object removeAttribute(String key);
@@ -235,7 +235,7 @@
CloseFuture getCloseFuture();
/**
- * Returns the socket address of remote peer.
+ * Returns the socket address of remote peer.
*/
SocketAddress getRemoteAddress();
@@ -251,7 +251,7 @@
* returns the [EMAIL PROTECTED] SocketAddress} which is specified as a
parameter of
* [EMAIL PROTECTED] IoAcceptor#bind()}. If this session is managed by
* [EMAIL PROTECTED] IoConnector}, this method returns the same address
with
- * that of [EMAIL PROTECTED] #getRemoteAddress()}.
+ * that of [EMAIL PROTECTED] #getRemoteAddress()}.
*/
SocketAddress getServiceAddress();
@@ -302,7 +302,7 @@
long getWrittenBytes();
/**
- * Returns the total number of messages which were read and decoded from
this session.
+ * Returns the total number of messages which were read and decoded from
this session.
*/
long getReadMessages();
@@ -320,7 +320,7 @@
* Returns the number of bytes which are scheduled to be written to this
* session.
*/
- int getScheduledWriteBytes();
+ long getScheduledWriteBytes();
/**
* Returns the time in millis when this session is created.
@@ -343,7 +343,7 @@
long getLastWriteTime();
/**
- * Returns <code>true</code> if this session is idle for the specified
+ * Returns <code>true</code> if this session is idle for the specified
* [EMAIL PROTECTED] IdleStatus}.
*/
boolean isIdle(IdleStatus status);
@@ -351,7 +351,7 @@
/**
* Returns the number of the fired continuous <tt>sessionIdle</tt> events
* for the specified [EMAIL PROTECTED] IdleStatus}.
- * <p>
+ * <p/>
* If <tt>sessionIdle</tt> event is fired first after some time after I/O,
* <tt>idleCount</tt> becomes <tt>1</tt>. <tt>idleCount</tt> resets to
* <tt>0</tt> if any I/O occurs again, otherwise it increases to
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
Mon Sep 3 21:28:47 2007
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.socket.nio;
@@ -34,26 +34,26 @@
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
-import org.apache.mina.common.TransportMetadata;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.TransportMetadata;
import org.apache.mina.common.WriteFuture;
import org.apache.mina.common.WriteRequest;
/**
* An [EMAIL PROTECTED] IoSession} for datagram transport (UDP/IP).
- *
+ *
* @author The Apache MINA Project ([EMAIL PROTECTED])
* @version $Rev$, $Date$
*/
class DatagramSessionImpl extends AbstractIoSession implements DatagramSession
{
-
+
static final TransportMetadata METADATA =
- new DefaultTransportMetadata(
- "datagram", true, false,
- InetSocketAddress.class,
- DatagramSessionConfig.class, ByteBuffer.class);
-
+ new DefaultTransportMetadata(
+ "datagram", true, false,
+ InetSocketAddress.class,
+ DatagramSessionConfig.class, ByteBuffer.class);
+
private final IoService service;
private final DatagramSessionConfig config = new SessionConfigImpl();
@@ -99,7 +99,7 @@
* Creates a new connector instance.
*/
DatagramSessionImpl(DatagramConnector service,
- DatagramChannel ch, IoHandler defaultHandler) {
+ DatagramChannel ch, IoHandler defaultHandler) {
this.service = service;
this.ch = ch;
this.handler = defaultHandler;
@@ -138,7 +138,7 @@
public IoHandler getHandler() {
return handler;
}
-
+
public TransportMetadata getTransportMetadata() {
return METADATA;
}
@@ -177,10 +177,10 @@
Object message = request.getMessage();
if (message instanceof ByteBuffer) {
if (((ByteBuffer) message).hasRemaining()) {
- size ++;
+ size++;
}
} else {
- size ++;
+ size++;
}
}
}
@@ -188,7 +188,7 @@
return size;
}
- public int getScheduledWriteBytes() {
+ public long getScheduledWriteBytes() {
int size = 0;
synchronized (writeRequestQueue) {
for (Object o : writeRequestQueue) {
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
Mon Sep 3 21:28:47 2007
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.socket.nio;
@@ -28,7 +28,7 @@
/**
* An [EMAIL PROTECTED] IoFilterChain} for socket transport (TCP/IP).
- *
+ *
* @author The Apache MINA Project ([EMAIL PROTECTED])
*/
class SocketFilterChain extends AbstractIoFilterChain {
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
Mon Sep 3 21:28:47 2007
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.socket.nio;
@@ -23,7 +23,6 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
-import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -31,12 +30,12 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.FileRegion;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoService;
import org.apache.mina.common.IoServiceListenerSupport;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.RuntimeIOException;
-import org.apache.mina.common.FileRegion;
import org.apache.mina.common.WriteRequest;
import org.apache.mina.common.WriteTimeoutException;
import org.apache.mina.util.NamePreservingRunnable;
@@ -111,8 +110,7 @@
void flush(SocketSessionImpl session) {
boolean needsWakeup = flushingSessions.isEmpty();
- scheduleFlush(session);
- if (needsWakeup) {
+ if (scheduleFlush(session) && needsWakeup) {
selector.wakeup();
}
}
@@ -126,8 +124,14 @@
removingSessions.offer(session);
}
- private void scheduleFlush(SocketSessionImpl session) {
- flushingSessions.offer(session);
+ private boolean scheduleFlush(SocketSessionImpl session) {
+ if (session.getInFlushQueue().compareAndSet(false, true)) {
+ flushingSessions.add(session);
+
+ return true;
+ }
+
+ return false;
}
private void scheduleTrafficControl(SocketSessionImpl session) {
@@ -135,7 +139,7 @@
}
private void doAddNew() {
- for (;;) {
+ for (; ;) {
SocketSessionImpl session = newSessions.poll();
if (session == null) {
@@ -169,7 +173,7 @@
}
private void doRemove() {
- for (;;) {
+ for (; ;) {
SocketSessionImpl session = removingSessions.poll();
if (session == null) {
@@ -202,10 +206,7 @@
}
private void process(Set<SelectionKey> selectedKeys) {
- Iterator<SelectionKey> it = selectedKeys.iterator();
-
- while (it.hasNext()) {
- SelectionKey key = it.next();
+ for (SelectionKey key : selectedKeys) {
SocketSessionImpl session = (SocketSessionImpl) key.attachment();
if (key.isReadable() && session.getTrafficMask().isReadable()) {
@@ -241,7 +242,7 @@
if (readBytes > 0) {
session.getFilterChain().fireMessageReceived(session, buf);
buf = null;
-
+
if (readBytes * 2 < session.getReadBufferSize()) {
if (session.getReadBufferSize() > 64) {
session.setReadBufferSize(session.getReadBufferSize()
>>> 1);
@@ -259,10 +260,10 @@
if (ret < 0) {
scheduleRemove(session);
}
+ } catch (IOException e) {
+ scheduleRemove(session);
+ session.getFilterChain().fireExceptionCaught(session, e);
} catch (Throwable e) {
- if (e instanceof IOException) {
- scheduleRemove(session);
- }
session.getFilterChain().fireExceptionCaught(session, e);
}
}
@@ -287,22 +288,22 @@
notifyIdleness0(session, currentTime, session
.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
- .getLastIdleTime(IdleStatus.BOTH_IDLE)));
+ .getLastIdleTime(IdleStatus.BOTH_IDLE)));
notifyIdleness0(session, currentTime, session
.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
- session.getLastIdleTime(IdleStatus.READER_IDLE)));
+ session.getLastIdleTime(IdleStatus.READER_IDLE)));
notifyIdleness0(session, currentTime, session
.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
- session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
+ session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
notifyWriteTimeout(session, currentTime, session
.getConfig().getWriteTimeoutInMillis(),
session.getLastWriteTime());
}
private void notifyIdleness0(SocketSessionImpl session, long currentTime,
- long idleTime, IdleStatus status, long lastIoTime) {
+ long idleTime, IdleStatus status, long
lastIoTime) {
if (idleTime > 0 && lastIoTime != 0
&& (currentTime - lastIoTime) >= idleTime) {
session.increaseIdleCount(status);
@@ -311,7 +312,7 @@
}
private void notifyWriteTimeout(SocketSessionImpl session,
- long currentTime, long writeTimeout, long lastIoTime) {
+ long currentTime, long writeTimeout, long
lastIoTime) {
SelectionKey key = session.getSelectionKey();
if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
&& key != null && key.isValid()
@@ -326,13 +327,15 @@
return;
}
- for (;;) {
+ for (; ;) {
SocketSessionImpl session = flushingSessions.poll();
if (session == null) {
break;
}
+ session.getInFlushQueue().set(false);
+
if (!session.isConnected()) {
clearWriteRequestQueue(session);
continue;
@@ -352,7 +355,10 @@
}
try {
- doFlush(session);
+ boolean flushedAll = doFlush(session);
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty() &&
!session.getInFlushQueue().get()) {
+ scheduleFlush(session);
+ }
} catch (IOException e) {
scheduleRemove(session);
session.getFilterChain().fireExceptionCaught(session, e);
@@ -364,16 +370,19 @@
Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
WriteRequest req;
- if ((req = writeRequestQueue.poll()) != null) {
+ while ((req = writeRequestQueue.poll()) != null) {
Object m = req.getMessage();
if (m instanceof ByteBuffer) {
ByteBuffer buf = (ByteBuffer) req.getMessage();
+
+
session.getScheduledWriteBytesCounter().addAndGet(-buf.remaining());
+
// The first unwritten empty buffer must be
// forwarded to the filter chain.
if (buf.hasRemaining()) {
req.getFuture().setWritten(false);
} else {
- session.getFilterChain().fireMessageSent(session, req);
+ session.getFilterChain().fireMessageSent(session, req);
}
} else {
req.getFuture().setWritten(false);
@@ -386,7 +395,7 @@
}
}
- private void doFlush(SocketSessionImpl session) throws IOException {
+ private boolean doFlush(SocketSessionImpl session) throws IOException {
// Clear OP_WRITE
SelectionKey key = session.getSelectionKey();
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -398,80 +407,72 @@
int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
try {
do {
- WriteRequest req;
-
// Check for pending writes.
- synchronized (writeRequestQueue) {
- req = writeRequestQueue.peek();
- }
-
+ WriteRequest req = writeRequestQueue.peek();
+
if (req == null) {
break;
}
-
+
Object message = req.getMessage();
if (message instanceof FileRegion) {
- FileRegion region = (FileRegion) message;
-
+ FileRegion region = (FileRegion) message;
+
if (region.getCount() <= 0) {
// File has been sent, remove from queue
- synchronized (writeRequestQueue) {
- writeRequestQueue.poll();
- }
+ writeRequestQueue.poll();
session.increaseWrittenMessages();
session.getFilterChain().fireMessageSent(session, req);
continue;
}
-
+
if (key.isWritable()) {
- long localWrittenBytes =
-
region.getFileChannel().transferTo(region.getPosition(), region.getCount(), ch);
+ long localWrittenBytes =
+
region.getFileChannel().transferTo(region.getPosition(), region.getCount(), ch);
region.setPosition(region.getPosition() +
localWrittenBytes);
writtenBytes += localWrittenBytes;
}
-
+
if (region.getCount() > 0 || writtenBytes >=
maxWrittenBytes) {
// Kernel buffer is full or wrote too much.
key.interestOps(key.interestOps() |
SelectionKey.OP_WRITE);
- break;
+ return false;
}
-
+
} else {
ByteBuffer buf = (ByteBuffer) message;
if (buf.remaining() == 0) {
// Buffer has been completely sent, remove request
form queue
- synchronized (writeRequestQueue) {
- writeRequestQueue.poll();
- }
-
+ writeRequestQueue.poll();
+
session.increaseWrittenMessages();
-
+
buf.reset();
session.getFilterChain().fireMessageSent(session, req);
continue;
}
-
+
if (key.isWritable()) {
writtenBytes += ch.write(buf.buf());
}
-
+
if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes)
{
// Kernel buffer is full or wrote too much.
key.interestOps(key.interestOps() |
SelectionKey.OP_WRITE);
- break;
+ return false;
}
}
} while (writtenBytes < maxWrittenBytes);
} finally {
session.increaseWrittenBytes(writtenBytes);
}
+
+ return true;
}
private void doUpdateTrafficMask() {
- for (;;) {
- SocketSessionImpl session;
-
- session = trafficControllingSessions.poll();
+ for (; ;) {
+ SocketSessionImpl session = trafficControllingSessions.poll();
if (session == null) {
break;
@@ -479,7 +480,7 @@
SelectionKey key = session.getSelectionKey();
// Retry later if session is not yet fully initialized.
- // (In case that Session.suspend??() or session.resume??() is
+ // (In case that Session.suspend??() or session.resume??() is
// called before addSession() is processed)
if (key == null) {
scheduleTrafficControl(session);
@@ -493,12 +494,8 @@
// The normal is OP_READ and, if there are write requests in the
// session's write queue, set OP_WRITE to trigger flushing.
int ops = SelectionKey.OP_READ;
- Queue<WriteRequest> writeRequestQueue = session
- .getWriteRequestQueue();
- synchronized (writeRequestQueue) {
- if (!writeRequestQueue.isEmpty()) {
- ops |= SelectionKey.OP_WRITE;
- }
+ if (!session.getWriteRequestQueue().isEmpty()) {
+ ops |= SelectionKey.OP_WRITE;
}
// Now mask the preferred ops with the mask of the current session
@@ -511,7 +508,7 @@
public void run() {
Thread.currentThread().setName(SocketIoProcessor.this.threadName);
- for (;;) {
+ for (; ;) {
try {
int nKeys = selector.select(1000);
doAddNew();
@@ -527,8 +524,7 @@
if (selector.keys().isEmpty()) {
synchronized (lock) {
- if (selector.keys().isEmpty()
- && newSessions.isEmpty()) {
+ if (selector.keys().isEmpty() &&
newSessions.isEmpty()) {
worker = null;
break;
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
Mon Sep 3 21:28:47 2007
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.socket.nio;
@@ -23,8 +23,10 @@
import java.net.SocketException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
-import java.util.LinkedList;
import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.mina.common.AbstractIoSession;
import org.apache.mina.common.ByteBuffer;
@@ -33,9 +35,9 @@
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
-import org.apache.mina.common.TransportMetadata;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.TransportMetadata;
import org.apache.mina.common.WriteRequest;
/**
@@ -45,13 +47,13 @@
* @version $Rev$, $Date$
*/
class SocketSessionImpl extends AbstractIoSession implements SocketSession {
-
- static final TransportMetadata METADATA =
- new DefaultTransportMetadata(
- "socket", false, true,
- InetSocketAddress.class,
- SocketSessionConfig.class,
- ByteBuffer.class, FileRegion.class);
+
+ static final TransportMetadata METADATA =
+ new DefaultTransportMetadata(
+ "socket", false, true,
+ InetSocketAddress.class,
+ SocketSessionConfig.class,
+ ByteBuffer.class, FileRegion.class);
private final IoService service;
@@ -67,20 +69,20 @@
private final IoHandler handler;
+ private final AtomicBoolean inFlushQueue = new AtomicBoolean(false);
+
+ private final AtomicLong scheduledWriteBytes = new AtomicLong();
+
private SelectionKey key;
private int readBufferSize = 1024;
- /**
- * Creates a new instance.
- */
- SocketSessionImpl(IoService service, SocketIoProcessor ioProcessor,
- SocketChannel ch) {
+ SocketSessionImpl(IoService service, SocketIoProcessor ioProcessor,
SocketChannel ch) {
this.service = service;
this.ioProcessor = ioProcessor;
this.filterChain = new SocketFilterChain(this);
this.ch = ch;
- this.writeRequestQueue = new LinkedList<WriteRequest>();
+ this.writeRequestQueue = new ConcurrentLinkedQueue<WriteRequest>();
this.handler = service.getHandler();
this.config.setAll(service.getSessionConfig());
}
@@ -100,7 +102,7 @@
public IoFilterChain getFilterChain() {
return filterChain;
}
-
+
public TransportMetadata getTransportMetadata() {
return METADATA;
}
@@ -132,33 +134,34 @@
public int getScheduledWriteMessages() {
int size = 0;
- synchronized (writeRequestQueue) {
- for (WriteRequest request : writeRequestQueue) {
- Object message = request.getMessage();
- if (message instanceof ByteBuffer) {
- if (((ByteBuffer) message).hasRemaining()) {
- size ++;
- }
- } else {
- size ++;
+
+ for (WriteRequest request : writeRequestQueue) {
+ Object message = request.getMessage();
+ if (message instanceof ByteBuffer) {
+ if (((ByteBuffer) message).hasRemaining()) {
+ size++;
}
+ } else {
+ size++;
}
}
return size;
}
- public int getScheduledWriteBytes() {
- int size = 0;
- synchronized (writeRequestQueue) {
- for (Object o : writeRequestQueue) {
- if (o instanceof ByteBuffer) {
- size += ((ByteBuffer) o).remaining();
- }
- }
- }
+ public long getScheduledWriteBytes() {
+ return scheduledWriteBytes.get();
+ }
- return size;
+ @Override
+ public void increaseWrittenBytes(long increment) {
+ super.increaseWrittenBytes(increment);
+
+ scheduledWriteBytes.addAndGet(-increment);
+ }
+
+ AtomicLong getScheduledWriteBytesCounter() {
+ return scheduledWriteBytes;
}
@Override
@@ -187,12 +190,16 @@
int getReadBufferSize() {
return readBufferSize;
}
-
+
void setReadBufferSize(int readBufferSize) {
this.readBufferSize = readBufferSize;
}
- private class SessionConfigImpl extends AbstractSocketSessionConfig {
+ AtomicBoolean getInFlushQueue() {
+ return inFlushQueue;
+ }
+
+ private class SessionConfigImpl extends AbstractSocketSessionConfig
implements SocketSessionConfig {
public boolean isKeepAlive() {
try {
return ch.socket().getKeepAlive();
@@ -338,22 +345,19 @@
}
}
}
-
+
void queueWriteRequest(WriteRequest writeRequest) {
if (writeRequest.getMessage() instanceof ByteBuffer) {
+ ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
// SocketIoProcessor.doFlush() will reset it after write is
finished
- // because the buffer will be passed with messageSent event.
- ((ByteBuffer) writeRequest.getMessage()).mark();
+ // because the buffer will be passed with messageSent event.
+ buffer.mark();
+ scheduledWriteBytes.addAndGet(buffer.remaining());
}
- int writeRequestQueueSize;
- synchronized (writeRequestQueue) {
- writeRequestQueue.offer(writeRequest);
- writeRequestQueueSize = writeRequestQueue.size();
- }
+ writeRequestQueue.add(writeRequest);
- if (writeRequestQueueSize == 1 && getTrafficMask().isWritable()) {
- // Notify SocketIoProcessor only when writeRequestQueue was empty.
+ if (getTrafficMask().isWritable()) {
getIoProcessor().flush(this);
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
Mon Sep 3 21:28:47 2007
@@ -30,8 +30,8 @@
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
import org.apache.mina.common.IoServiceListenerSupport;
-import org.apache.mina.common.TransportMetadata;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.TransportMetadata;
import org.apache.mina.common.WriteRequest;
/**
@@ -41,14 +41,14 @@
* @version $Rev$, $Date$
*/
class VmPipeSessionImpl extends AbstractIoSession implements VmPipeSession {
-
- static final TransportMetadata METADATA =
- new DefaultTransportMetadata(
- "vmpipe", false, false,
- VmPipeAddress.class,
- VmPipeSessionConfig.class,
- Object.class);
-
+
+ static final TransportMetadata METADATA =
+ new DefaultTransportMetadata(
+ "vmpipe", false, false,
+ VmPipeAddress.class,
+ VmPipeSessionConfig.class,
+ Object.class);
+
private static final VmPipeSessionConfig CONFIG = new
DefaultVmPipeSessionConfig();
private final IoService service;
@@ -71,12 +71,12 @@
final BlockingQueue<Object> pendingDataQueue;
- /**
+ /*
* Constructor for client-side session.
*/
VmPipeSessionImpl(IoService service,
- IoServiceListenerSupport serviceListeners,
- VmPipeAddress localAddress, IoHandler handler, VmPipe remoteEntry)
{
+ IoServiceListenerSupport serviceListeners,
+ VmPipeAddress localAddress, IoHandler handler, VmPipe
remoteEntry) {
this.service = service;
this.serviceListeners = serviceListeners;
this.lock = new Object();
@@ -89,7 +89,7 @@
remoteSession = new VmPipeSessionImpl(this, remoteEntry);
}
- /**
+ /*
* Constructor for server-side session.
*/
private VmPipeSessionImpl(VmPipeSessionImpl remoteSession, VmPipe entry) {
@@ -127,7 +127,7 @@
public IoHandler getHandler() {
return handler;
}
-
+
public TransportMetadata getTransportMetadata() {
return METADATA;
}
@@ -146,7 +146,7 @@
return 0;
}
- public int getScheduledWriteBytes() {
+ public long getScheduledWriteBytes() {
return 0;
}
Modified:
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java?rev=572512&r1=572511&r2=572512&view=diff
==============================================================================
---
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
(original)
+++
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
Mon Sep 3 21:28:47 2007
@@ -6,23 +6,19 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.serial;
-import gnu.io.SerialPort;
-import gnu.io.SerialPortEvent;
-import gnu.io.SerialPortEventListener;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -31,6 +27,9 @@
import java.util.Queue;
import java.util.TooManyListenersException;
+import gnu.io.SerialPort;
+import gnu.io.SerialPortEvent;
+import gnu.io.SerialPortEventListener;
import org.apache.mina.common.AbstractIoSession;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.DefaultTransportMetadata;
@@ -39,9 +38,9 @@
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
-import org.apache.mina.common.TransportMetadata;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.TransportMetadata;
import org.apache.mina.common.WriteRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,9 +75,9 @@
private final Logger log;
static final TransportMetadata METADATA =
- new DefaultTransportMetadata(
- "serial", false, true, SerialAddress.class,
- SerialSessionConfig.class, ByteBuffer.class);
+ new DefaultTransportMetadata(
+ "serial", false, true, SerialAddress.class,
+ SerialSessionConfig.class, ByteBuffer.class);
SerialSession(IoService service, SerialAddress address, SerialPort port) {
this.service = service;
@@ -107,7 +106,7 @@
public IoHandler getHandler() {
return ioHandler;
}
-
+
public TransportMetadata getTransportMetadata() {
return METADATA;
}
@@ -131,10 +130,10 @@
Object message = request.getMessage();
if (message instanceof ByteBuffer) {
if (((ByteBuffer) message).hasRemaining()) {
- size ++;
+ size++;
}
} else {
- size ++;
+ size++;
}
}
}
@@ -142,7 +141,7 @@
return size;
}
- public int getScheduledWriteBytes() {
+ public long getScheduledWriteBytes() {
int size = 0;
synchronized (writeRequestQueue) {
for (Object o : writeRequestQueue) {
@@ -168,7 +167,7 @@
/**
* start handling streams
- *
+ *
* @throws IOException
* @throws TooManyListenersException
*/
@@ -205,7 +204,7 @@
}
private void flushWrites() {
- for (;;) {
+ for (; ;) {
WriteRequest req;
synchronized (writeRequestQueue) {