Author: dejanb
Date: Wed Mar 10 11:48:08 2010
New Revision: 921318
URL: http://svn.apache.org/viewvc?rev=921318&view=rev
Log:
merging 897262,897939,898774,916762,916780,920325,920330,920827,920838,920881 -
https://issues.apache.org/activemq/browse/AMQ-2440 - stomp+nio
Added:
activemq/branches/activemq-5.3/assembly/src/sample-conf/activemq-stomp.xml
- copied unchanged from r897262,
activemq/trunk/assembly/src/sample-conf/activemq-stomp.xml
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Wed Mar 10 11:48:08 2010
@@ -77,6 +77,7 @@ import org.apache.activemq.state.Consume
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.state.TransactionState;
+import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -91,6 +92,8 @@ import org.apache.activemq.util.ServiceS
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+import static org.apache.activemq.thread.DefaultThreadPools.*;
/**
* @version $Revision: 1.8 $
*/
@@ -908,8 +911,7 @@ public class TransportConnection impleme
cs.getContext().getStopping().set(true);
}
try {
- new Thread("ActiveMQ Transport Stopper: " +
transport.getRemoteAddress()) {
- @Override
+ getDefaultTaskRunnerFactory().execute(new Runnable(){
public void run() {
serviceLock.writeLock().lock();
try {
@@ -922,7 +924,7 @@ public class TransportConnection impleme
serviceLock.writeLock().unlock();
}
}
- }.start();
+ });
} catch (Throwable t) {
LOG.warn("cannot create async transport stopper thread.. not
waiting for stop to complete, reason:", t);
stopped.countDown();
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
Wed Mar 10 11:48:08 2010
@@ -21,6 +21,7 @@ import org.apache.activemq.broker.jmx.Ma
import org.apache.activemq.broker.region.ConnectorStatistics;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.security.MessageAuthorizationPolicy;
+import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
@@ -32,6 +33,9 @@ import org.apache.activemq.util.ServiceS
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+import static org.apache.activemq.thread.DefaultThreadPools.*;
+
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -202,9 +206,7 @@ public class TransportConnector implemen
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport) {
try {
- // Starting the connection could block due to
- // wireformat negotiation, so start it in an async thread.
- Thread startThread = new Thread("ActiveMQ Transport
Initiator: " + transport.getRemoteAddress()) {
+ getDefaultTaskRunnerFactory().execute(new Runnable(){
public void run() {
try {
Connection connection =
createConnection(transport);
@@ -214,8 +216,7 @@ public class TransportConnector implemen
onAcceptError(e);
}
}
- };
- startThread.start();
+ });
} catch (Exception e) {
String remoteHost = transport.getRemoteAddress();
ServiceSupport.dispose(transport);
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
Wed Mar 10 11:48:08 2010
@@ -26,24 +26,24 @@ import java.util.concurrent.ThreadFactor
*/
public final class DefaultThreadPools {
- private static final Executor DEFAULT_POOL;
- static {
- DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "ActiveMQ Default Thread
Pool Thread");
- thread.setDaemon(true);
- return thread;
- }
- });
- }
+// private static final Executor DEFAULT_POOL;
+// static {
+// DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new
ThreadFactory() {
+// public Thread newThread(Runnable runnable) {
+// Thread thread = new Thread(runnable, "ActiveMQ Default
Thread Pool Thread");
+// thread.setDaemon(true);
+// return thread;
+// }
+// });
+// }
private static final TaskRunnerFactory DEFAULT_TASK_RUNNER_FACTORY = new
TaskRunnerFactory();
private DefaultThreadPools() {
}
- public static Executor getDefaultPool() {
- return DEFAULT_POOL;
- }
+// public static Executor getDefaultPool() {
+// return DEFAULT_POOL;
+// }
public static TaskRunnerFactory getDefaultTaskRunnerFactory() {
return DEFAULT_TASK_RUNNER_FACTORY;
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
Wed Mar 10 11:48:08 2010
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.thread;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@@ -31,7 +32,7 @@ import java.util.concurrent.TimeUnit;
*
* @version $Revision: 1.5 $
*/
-public class TaskRunnerFactory {
+public class TaskRunnerFactory implements Executor {
private ExecutorService executor;
private int maxIterationsPerRun;
@@ -80,6 +81,18 @@ public class TaskRunnerFactory {
}
}
+ public void execute(Runnable runnable) {
+ execute(runnable, "ActiveMQ Task");
+ }
+
+ public void execute(Runnable runnable, String name) {
+ if (executor != null) {
+ executor.execute(runnable);
+ } else {
+ new Thread(runnable, name).start();
+ }
+ }
+
protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
Wed Mar 10 11:48:08 2010
@@ -149,7 +149,7 @@ public class NIOTransport extends TcpTra
}
protected void doStop(ServiceStopper stopper) throws Exception {
- selection.disable();
+ selection.close();
super.doStop(stopper);
}
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
Wed Mar 10 11:48:08 2010
@@ -20,8 +20,11 @@ import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* The SelectorManager will manage one Selector and the thread that checks the
@@ -36,16 +39,20 @@ public final class SelectorManager {
public static final SelectorManager SINGLETON = new SelectorManager();
- private Executor selectorExecutor = Executors.newCachedThreadPool(new
ThreadFactory() {
- public Thread newThread(Runnable r) {
- Thread rc = new Thread(r);
- rc.setName("NIO Transport Thread");
- return rc;
- }
- });
+ private Executor selectorExecutor = createDefaultExecutor();
private Executor channelExecutor = selectorExecutor;
private LinkedList<SelectorWorker> freeWorkers = new
LinkedList<SelectorWorker>();
- private int maxChannelsPerWorker = 64;
+ private int maxChannelsPerWorker = 1024;
+
+ protected ExecutorService createDefaultExecutor() {
+ ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
10, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new
ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ return new Thread(runnable, "ActiveMQ NIO Worker");
+ }
+ });
+ // rc.allowCoreThreadTimeOut(true);
+ return rc;
+ }
public static SelectorManager getInstance() {
return SINGLETON;
@@ -61,15 +68,25 @@ public final class SelectorManager {
public synchronized SelectorSelection register(SocketChannel
socketChannel, Listener listener)
throws IOException {
- SelectorWorker worker = null;
- if (freeWorkers.size() > 0) {
- worker = freeWorkers.getFirst();
- } else {
- worker = new SelectorWorker(this);
- freeWorkers.addFirst(worker);
+ SelectorSelection selection = null;
+ while( selection == null ) {
+ if (freeWorkers.size() > 0) {
+ SelectorWorker worker = freeWorkers.getFirst();
+ if( worker.isReleased() ) {
+ freeWorkers.remove(worker);
+ } else {
+ worker.retain();
+ selection = new SelectorSelection(worker, socketChannel,
listener);
+ }
+
+ } else {
+ // Worker starts /w retain count of 1
+ SelectorWorker worker = new SelectorWorker(this);
+ freeWorkers.addFirst(worker);
+ selection = new SelectorSelection(worker, socketChannel,
listener);
+ }
}
-
- SelectorSelection selection = new SelectorSelection(worker,
socketChannel, listener);
+
return selection;
}
@@ -82,7 +99,7 @@ public final class SelectorManager {
}
public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
- freeWorkers.add(worker);
+ freeWorkers.addFirst(worker);
}
public Executor getChannelExecutor() {
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
Wed Mar 10 11:48:08 2010
@@ -16,9 +16,11 @@
*/
package org.apache.activemq.transport.nio;
+import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.nio.SelectorManager.Listener;
@@ -28,23 +30,23 @@ import org.apache.activemq.transport.nio
public final class SelectorSelection {
private final SelectorWorker worker;
- private final SelectionKey key;
private final Listener listener;
private int interest;
+ private SelectionKey key;
+ private AtomicBoolean closed = new AtomicBoolean();
- public SelectorSelection(SelectorWorker worker, SocketChannel
socketChannel, Listener listener) throws ClosedChannelException {
+ public SelectorSelection(final SelectorWorker worker, final SocketChannel
socketChannel, Listener listener) throws ClosedChannelException {
this.worker = worker;
this.listener = listener;
-
- // Lock when mutating state of the selector
- worker.lock();
-
- try {
- this.key = socketChannel.register(worker.selector, 0, this);
- worker.incrementUseCounter();
- } finally {
- worker.unlock();
- }
+ worker.addIoTask(new Runnable() {
+ public void run() {
+ try {
+ SelectorSelection.this.key =
socketChannel.register(worker.selector, 0, SelectorSelection.this);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
}
public void setInterestOps(int ops) {
@@ -52,25 +54,39 @@ public final class SelectorSelection {
}
public void enable() {
- key.interestOps(interest);
- worker.selector.wakeup();
+ worker.addIoTask(new Runnable() {
+ public void run() {
+ try {
+ key.interestOps(interest);
+ } catch (CancelledKeyException e) {
+ }
+ }
+ });
}
public void disable() {
- if (key.isValid()) {
- key.interestOps(0);
- }
+ worker.addIoTask(new Runnable() {
+ public void run() {
+ try {
+ key.interestOps(0);
+ } catch (CancelledKeyException e) {
+ }
+ }
+ });
}
public void close() {
- worker.decrementUseCounter();
-
- // Lock when mutating state of the selector
- worker.lock();
- try {
- key.cancel();
- } finally {
- worker.unlock();
+ // guard against multiple closes.
+ if( closed.compareAndSet(false, true) ) {
+ worker.addIoTask(new Runnable() {
+ public void run() {
+ try {
+ key.cancel();
+ } catch (CancelledKeyException e) {
+ }
+ worker.release();
+ }
+ });
}
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
Wed Mar 10 11:48:08 2010
@@ -21,10 +21,8 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
-
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SelectorWorker implements Runnable {
@@ -33,55 +31,71 @@ public class SelectorWorker implements R
final SelectorManager manager;
final Selector selector;
final int id = NEXT_ID.getAndIncrement();
- final AtomicInteger useCounter = new AtomicInteger();
private final int maxChannelsPerWorker;
- private final ReadWriteLock selectorLock = new ReentrantReadWriteLock();
+
+ final AtomicInteger retainCounter = new AtomicInteger(1);
+ private final ConcurrentLinkedQueue<Runnable> ioTasks = new
ConcurrentLinkedQueue<Runnable>();
public SelectorWorker(SelectorManager manager) throws IOException {
this.manager = manager;
selector = Selector.open();
maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
+ manager.getSelectorExecutor().execute(this);
}
- void incrementUseCounter() {
- int use = useCounter.getAndIncrement();
- if (use == 0) {
- manager.getSelectorExecutor().execute(this);
- } else if (use + 1 == maxChannelsPerWorker) {
+ void retain() {
+ if (retainCounter.incrementAndGet() == maxChannelsPerWorker) {
manager.onWorkerFullEvent(this);
}
}
- void decrementUseCounter() {
- int use = useCounter.getAndDecrement();
- if (use == 1) {
+ void release() {
+ int use = retainCounter.decrementAndGet();
+ if (use == 0) {
manager.onWorkerEmptyEvent(this);
- } else if (use == maxChannelsPerWorker) {
+ } else if (use == maxChannelsPerWorker - 1) {
manager.onWorkerNotFullEvent(this);
}
}
+
+ boolean isReleased() {
+ return retainCounter.get()==0;
+ }
+
- boolean isRunning() {
- return useCounter.get() != 0;
+ public void addIoTask(Runnable work) {
+ ioTasks.add(work);
+ selector.wakeup();
}
+
+ private void processIoTasks() {
+ Runnable task;
+ while( (task= ioTasks.poll()) !=null ) {
+ try {
+ task.run();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
public void run() {
String origName = Thread.currentThread().getName();
try {
Thread.currentThread().setName("Selector Worker: " + id);
- while (isRunning()) {
-
- lockBarrier();
- int count = selector.select(10);
+ while (!isReleased()) {
+
+ processIoTasks();
+
+ int count = selector.select(10);
+
if (count == 0) {
continue;
}
- if (!isRunning()) {
- return;
- }
-
// Get a java.util.Set containing the SelectionKey objects
// for all channels that are ready for I/O.
Set keys = selector.selectedKeys();
@@ -92,7 +106,9 @@ public class SelectorWorker implements R
final SelectorSelection s =
(SelectorSelection)key.attachment();
try {
- s.disable();
+ if( key.isValid() ) {
+ key.interestOps(0);
+ }
// Kick off another thread to find newly selected keys
// while we process the
@@ -115,11 +131,8 @@ public class SelectorWorker implements R
}
}
- } catch (IOException e) {
-
- // Don't accept any more slections
- manager.onWorkerEmptyEvent(this);
-
+ } catch (Throwable e) {
+ e.printStackTrace();
// Notify all the selections that the error occurred.
Set keys = selector.keys();
for (Iterator i = keys.iterator(); i.hasNext();) {
@@ -127,24 +140,15 @@ public class SelectorWorker implements R
SelectorSelection s = (SelectorSelection)key.attachment();
s.onError(e);
}
-
} finally {
+ try {
+ manager.onWorkerEmptyEvent(this);
+ selector.close();
+ } catch (IOException ignore) {
+ ignore.printStackTrace();
+ }
Thread.currentThread().setName(origName);
}
}
- private void lockBarrier() {
- selectorLock.writeLock().lock();
- selectorLock.writeLock().unlock();
- }
-
- public void lock() {
- selectorLock.readLock().lock();
- selector.wakeup();
- }
-
- public void unlock() {
- selectorLock.readLock().unlock();
- }
-
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
Wed Mar 10 11:48:08 2010
@@ -25,10 +25,6 @@ import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.HashMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
public class StompConnection {
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
Wed Mar 10 11:48:08 2010
@@ -16,8 +16,9 @@
*/
package org.apache.activemq.transport.stomp;
-import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
@@ -30,11 +31,12 @@ import javax.net.SocketFactory;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.nio.NIOBufferedInputStream;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
@@ -48,6 +50,10 @@ public class StompNIOTransport extends T
private SocketChannel channel;
private SelectorSelection selection;
+
+ private ByteBuffer inputBuffer;
+ ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
+ int previousByte = -1;
public StompNIOTransport(WireFormat wireFormat, SocketFactory
socketFactory, URI remoteLocation, URI localLocation) throws
UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
@@ -76,19 +82,54 @@ public class StompNIOTransport extends T
}
});
+ inputBuffer = ByteBuffer.allocate(8 * 1024);
this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 *
1024));
}
-
+
private void serviceRead() {
try {
- DataInputStream in = new DataInputStream(new
NIOBufferedInputStream(channel, 8 * 1024));
- while (true) {
- Object command = wireFormat.unmarshal(in);
- doConsume((Command)command);
- }
-
+
+ while (true) {
+ // read channel
+ int readSize = channel.read(inputBuffer);
+ // channel is closed, cleanup
+ if (readSize == -1) {
+ onException(new EOFException());
+ selection.close();
+ break;
+ }
+ // nothing more to read, break
+ if (readSize == 0) {
+ break;
+ }
+
+ inputBuffer.flip();
+
+ int b;
+ ByteArrayInputStream input = new
ByteArrayInputStream(inputBuffer.array());
+
+ int i = 0;
+ while(i++ < readSize) {
+ b = input.read();
+ // skip repeating nulls
+ if (previousByte == 0 && b == 0) {
+ continue;
+ }
+ currentCommand.write(b);
+ // end of command reached, unmarshal
+ if (b == 0) {
+ Object command = wireFormat.unmarshal(new
ByteSequence(currentCommand.toByteArray()));
+ doConsume((Command)command);
+ currentCommand.reset();
+ }
+ previousByte = b;
+ }
+ // clear the buffer
+ inputBuffer.clear();
+
+ }
} catch (IOException e) {
- onException(e);
+ onException(e);
} catch (Throwable e) {
onException(IOExceptionSupport.create(e));
}
@@ -101,7 +142,11 @@ public class StompNIOTransport extends T
}
protected void doStop(ServiceStopper stopper) throws Exception {
- selection.disable();
+ try {
+ selection.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
super.doStop(stopper);
}
}