Author: tabish
Date: Thu Oct 13 14:54:15 2011
New Revision: 1182890
URL: http://svn.apache.org/viewvc?rev=1182890&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3481
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
Thu Oct 13 14:54:15 2011
@@ -24,6 +24,7 @@ import java.util.concurrent.ThreadPoolEx
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo;
@@ -56,6 +57,8 @@ public abstract class AbstractInactivity
private final AtomicBoolean inReceive = new AtomicBoolean(false);
private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
+ private final ReentrantReadWriteLock sendLock = new
ReentrantReadWriteLock();
+
private SchedulerTimerTask writeCheckerTask;
private SchedulerTimerTask readCheckerTask;
@@ -140,11 +143,17 @@ public abstract class AbstractInactivity
public void run() {
if (monitorStarted.get()) {
try {
- KeepAliveInfo info = new KeepAliveInfo();
-
info.setResponseRequired(keepAliveResponseRequired);
- oneway(info);
+ // If we can't get the lock it means another write
beat us into the
+ // send and we don't need to heart beat now.
+ if (sendLock.writeLock().tryLock()) {
+ KeepAliveInfo info = new KeepAliveInfo();
+
info.setResponseRequired(keepAliveResponseRequired);
+ doOnewaySend(info);
+ }
} catch (IOException e) {
onException(e);
+ } finally {
+ sendLock.writeLock().unlock();
}
}
};
@@ -175,7 +184,6 @@ public abstract class AbstractInactivity
public void run() {
onException(new InactivityIOException("Channel was
inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
};
-
});
} else {
if (LOG.isTraceEnabled()) {
@@ -195,11 +203,14 @@ public abstract class AbstractInactivity
if (command.getClass() == KeepAliveInfo.class) {
KeepAliveInfo info = (KeepAliveInfo) command;
if (info.isResponseRequired()) {
+ sendLock.readLock().lock();
try {
info.setResponseRequired(false);
oneway(info);
} catch (IOException e) {
onException(e);
+ } finally {
+ sendLock.readLock().unlock();
}
}
} else {
@@ -212,39 +223,41 @@ public abstract class AbstractInactivity
}
}
}
- synchronized (readChecker) {
- transportListener.onCommand(command);
- }
+
+ transportListener.onCommand(command);
}
} finally {
-
inReceive.set(false);
}
}
public void oneway(Object o) throws IOException {
- // Disable inactivity monitoring while processing a command.
- // synchronize this method - its not synchronized
- // further down the transport stack and gets called by more
- // than one thread by this class
- synchronized(inSend) {
- inSend.set(true);
- try {
+ // To prevent the inactivity monitor from sending a message while we
+ // are performing a send we take a read lock. The inactivity monitor
+ // sends its Heart-beat commands under a write lock. This means that
+ // the MutexTransport is still responsible for synchronizing sends
+ this.sendLock.readLock().lock();
+ inSend.set(true);
+ try {
+ doOnewaySend(o);
+ } finally {
+ commandSent.set(true);
+ inSend.set(false);
+ this.sendLock.readLock().unlock();
+ }
+ }
- if( failed.get() ) {
- throw new InactivityIOException("Cannot send, channel has
already failed: "+next.getRemoteAddress());
- }
- if (o.getClass() == WireFormatInfo.class) {
- synchronized (this) {
- processOutboundWireFormatInfo((WireFormatInfo) o);
- }
- }
- next.oneway(o);
- } finally {
- commandSent.set(true);
- inSend.set(false);
+ // Must be called under lock, either read or write on sendLock.
+ private void doOnewaySend(Object command) throws IOException {
+ if( failed.get() ) {
+ throw new InactivityIOException("Cannot send, channel has already
failed: "+next.getRemoteAddress());
+ }
+ if (command.getClass() == WireFormatInfo.class) {
+ synchronized (this) {
+ processOutboundWireFormatInfo((WireFormatInfo) command);
}
}
+ next.oneway(command);
}
public void onException(IOException error) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
Thu Oct 13 14:54:15 2011
@@ -17,44 +17,90 @@
package org.apache.activemq.transport;
import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
/**
- *
+ * Thread safe Transport Filter that serializes calls to and from the
Transport Stack.
*/
public class MutexTransport extends TransportFilter {
- private final Object writeMutex = new Object();
+ private final ReentrantLock wreiteLock = new ReentrantLock();
+ private boolean syncOnCommand;
public MutexTransport(Transport next) {
super(next);
+ this.syncOnCommand = false;
}
+ public MutexTransport(Transport next, boolean syncOnCommand) {
+ super(next);
+ this.syncOnCommand = syncOnCommand;
+ }
+
+ @Override
+ public void onCommand(Object command) {
+ if (syncOnCommand) {
+ wreiteLock.lock();
+ try {
+ transportListener.onCommand(command);
+ } finally {
+ wreiteLock.unlock();
+ }
+ } else {
+ transportListener.onCommand(command);
+ }
+ }
+
+ @Override
public FutureResponse asyncRequest(Object command, ResponseCallback
responseCallback) throws IOException {
- synchronized (writeMutex) {
+ wreiteLock.lock();
+ try {
return next.asyncRequest(command, null);
+ } finally {
+ wreiteLock.unlock();
}
}
+ @Override
public void oneway(Object command) throws IOException {
- synchronized (writeMutex) {
+ wreiteLock.lock();
+ try {
next.oneway(command);
+ } finally {
+ wreiteLock.unlock();
}
}
+ @Override
public Object request(Object command) throws IOException {
- synchronized (writeMutex) {
+ wreiteLock.lock();
+ try {
return next.request(command);
+ } finally {
+ wreiteLock.unlock();
}
}
+ @Override
public Object request(Object command, int timeout) throws IOException {
- synchronized (writeMutex) {
+ wreiteLock.lock();
+ try {
return next.request(command, timeout);
+ } finally {
+ wreiteLock.unlock();
}
}
+ @Override
public String toString() {
return next.toString();
}
+ public boolean isSyncOnCommand() {
+ return syncOnCommand;
+ }
+
+ public void setSyncOnCommand(boolean syncOnCommand) {
+ this.syncOnCommand = syncOnCommand;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Thu Oct 13 14:54:15 2011
@@ -166,16 +166,7 @@ public class ProtocolConverter {
command.setResponseRequired(true);
resposeHandlers.put(Integer.valueOf(command.getCommandId()),
handler);
}
- stompTransport.asyncSendToActiveMQ(command);
- }
-
- protected void asyncSendToActiveMQ(Command command, ResponseHandler
handler) {
- command.setCommandId(generateCommandId());
- if (handler != null) {
- command.setResponseRequired(true);
- resposeHandlers.put(Integer.valueOf(command.getCommandId()),
handler);
- }
- stompTransport.asyncSendToActiveMQ(command);
+ stompTransport.sendToActiveMQ(command);
}
protected void sendToStomp(StompFrame command) throws IOException {
@@ -301,7 +292,7 @@ public class ProtocolConverter {
}
message.onSend();
- asyncSendToActiveMQ(message, createResponseHandler(command));
+ sendToActiveMQ(message, createResponseHandler(command));
}
protected void onStompNack(StompFrame command) throws ProtocolException {
@@ -338,7 +329,7 @@ public class ProtocolConverter {
if (sub != null) {
MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
if (ack != null) {
- asyncSendToActiveMQ(ack, createResponseHandler(command));
+ sendToActiveMQ(ack, createResponseHandler(command));
} else {
throw new ProtocolException("Unexpected NACK received for
message-id [" + messageId + "]");
}
@@ -377,7 +368,7 @@ public class ProtocolConverter {
if (sub != null) {
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
if (ack != null) {
- asyncSendToActiveMQ(ack, createResponseHandler(command));
+ sendToActiveMQ(ack, createResponseHandler(command));
acked = true;
}
}
@@ -391,7 +382,7 @@ public class ProtocolConverter {
for (StompSubscription sub : subscriptionsByConsumerId.values()) {
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
if (ack != null) {
- asyncSendToActiveMQ(ack, createResponseHandler(command));
+ sendToActiveMQ(ack, createResponseHandler(command));
acked = true;
break;
}
@@ -426,7 +417,7 @@ public class ProtocolConverter {
tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.BEGIN);
- asyncSendToActiveMQ(tx, createResponseHandler(command));
+ sendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompCommit(StompFrame command) throws ProtocolException {
@@ -453,7 +444,7 @@ public class ProtocolConverter {
tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
- asyncSendToActiveMQ(tx, createResponseHandler(command));
+ sendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompAbort(StompFrame command) throws ProtocolException {
@@ -482,7 +473,7 @@ public class ProtocolConverter {
tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.ROLLBACK);
- asyncSendToActiveMQ(tx, createResponseHandler(command));
+ sendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompSubscribe(StompFrame command) throws
ProtocolException {
@@ -550,7 +541,7 @@ public class ProtocolConverter {
// dispatch can beat the receipt so send it early
sendReceipt(command);
- asyncSendToActiveMQ(consumerInfo, null);
+ sendToActiveMQ(consumerInfo, null);
}
protected void onStompUnsubscribe(StompFrame command) throws
ProtocolException {
@@ -579,7 +570,7 @@ public class ProtocolConverter {
info.setClientId(durable);
info.setSubscriptionName(durable);
info.setConnectionId(connectionId);
- asyncSendToActiveMQ(info, createResponseHandler(command));
+ sendToActiveMQ(info, createResponseHandler(command));
return;
}
@@ -587,7 +578,7 @@ public class ProtocolConverter {
StompSubscription sub = this.subscriptions.remove(subscriptionId);
if (sub != null) {
-
asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(),
createResponseHandler(command));
+ sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(),
createResponseHandler(command));
return;
}
@@ -598,7 +589,7 @@ public class ProtocolConverter {
for (Iterator<StompSubscription> iter =
subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next();
if (destination != null &&
destination.equals(sub.getDestination())) {
-
asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(),
createResponseHandler(command));
+
sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(),
createResponseHandler(command));
iter.remove();
return;
}
@@ -721,8 +712,8 @@ public class ProtocolConverter {
protected void onStompDisconnect(StompFrame command) throws
ProtocolException {
checkConnected();
- asyncSendToActiveMQ(connectionInfo.createRemoveCommand(),
createResponseHandler(command));
- asyncSendToActiveMQ(new ShutdownInfo(),
createResponseHandler(command));
+ sendToActiveMQ(connectionInfo.createRemoveCommand(),
createResponseHandler(command));
+ sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
connected.set(false);
}
@@ -787,7 +778,7 @@ public class ProtocolConverter {
ActiveMQDestination rc = tempDestinations.get(name);
if( rc == null ) {
rc = new ActiveMQTempQueue(connectionId,
tempDestinationGenerator.getNextSequenceId());
- asyncSendToActiveMQ(new DestinationInfo(connectionId,
DestinationInfo.ADD_OPERATION_TYPE, rc), null);
+ sendToActiveMQ(new DestinationInfo(connectionId,
DestinationInfo.ADD_OPERATION_TYPE, rc), null);
tempDestinations.put(name, rc);
}
return rc;
@@ -797,7 +788,7 @@ public class ProtocolConverter {
ActiveMQDestination rc = tempDestinations.get(name);
if( rc == null ) {
rc = new ActiveMQTempTopic(connectionId,
tempDestinationGenerator.getNextSequenceId());
- asyncSendToActiveMQ(new DestinationInfo(connectionId,
DestinationInfo.ADD_OPERATION_TYPE, rc), null);
+ sendToActiveMQ(new DestinationInfo(connectionId,
DestinationInfo.ADD_OPERATION_TYPE, rc), null);
tempDestinations.put(name, rc);
tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
Thu Oct 13 14:54:15 2011
@@ -21,6 +21,7 @@ import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
+import java.util.HashMap;
import java.util.Map;
import javax.net.ServerSocketFactory;
@@ -29,6 +30,7 @@ import javax.net.SocketFactory;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
@@ -62,6 +64,19 @@ public class StompNIOTransportFactory ex
}
@SuppressWarnings("rawtypes")
+ @Override
+ public Transport serverConfigure(Transport transport, WireFormat format,
HashMap options) throws Exception {
+ transport = super.serverConfigure(transport, format, options);
+
+ MutexTransport mutex = transport.narrow(MutexTransport.class);
+ if (mutex != null) {
+ mutex.setSyncOnCommand(true);
+ }
+
+ return transport;
+ }
+
+ @SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat
format, Map options) {
transport = new StompTransportFilter(transport, format, brokerContext);
IntrospectionSupport.setProperties(transport, options);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
Thu Oct 13 14:54:15 2011
@@ -16,11 +16,13 @@
*/
package org.apache.activemq.transport.stomp;
+import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
@@ -46,6 +48,19 @@ public class StompSslTransportFactory ex
return super.compositeConfigure(transport, format, options);
}
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Transport serverConfigure(Transport transport, WireFormat format,
HashMap options) throws Exception {
+ transport = super.serverConfigure(transport, format, options);
+
+ MutexTransport mutex = transport.narrow(MutexTransport.class);
+ if (mutex != null) {
+ mutex.setSyncOnCommand(true);
+ }
+
+ return transport;
+ }
+
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Thu Oct 13 14:54:15 2011
@@ -75,7 +75,7 @@ public class StompSubscription {
}
} else if (ackMode == AUTO_ACK) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE,
1);
- protocolConverter.getStompTransport().asyncSendToActiveMQ(ack);
+ protocolConverter.getStompTransport().sendToActiveMQ(ack);
}
boolean ignoreTransformation = false;
@@ -115,7 +115,7 @@ public class StompSubscription {
if (!unconsumedMessage.isEmpty()) {
MessageAck ack = new MessageAck(unconsumedMessage.getLast(),
MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
- protocolConverter.getStompTransport().asyncSendToActiveMQ(ack);
+ protocolConverter.getStompTransport().sendToActiveMQ(ack);
unconsumedMessage.clear();
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
Thu Oct 13 14:54:15 2011
@@ -29,8 +29,6 @@ public interface StompTransport {
public void sendToActiveMQ(Command command);
- public void asyncSendToActiveMQ(Command command);
-
public void sendToStomp(StompFrame command) throws IOException;
public X509Certificate[] getPeerCertificates();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
Thu Oct 13 14:54:15 2011
@@ -16,11 +16,13 @@
*/
package org.apache.activemq.transport.stomp;
+import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
@@ -28,8 +30,6 @@ import org.apache.activemq.wireformat.Wi
/**
* A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
- *
- *
*/
public class StompTransportFactory extends TcpTransportFactory implements
BrokerServiceAware {
@@ -50,6 +50,19 @@ public class StompTransportFactory exten
this.brokerContext = brokerService.getBrokerContext();
}
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Transport serverConfigure(Transport transport, WireFormat format,
HashMap options) throws Exception {
+ transport = super.serverConfigure(transport, format, options);
+
+ MutexTransport mutex = transport.narrow(MutexTransport.class);
+ if (mutex != null) {
+ mutex.setSyncOnCommand(true);
+ }
+
+ return transport;
+ }
+
@Override
protected Transport createInactivityMonitor(Transport transport,
WireFormat format) {
StompInactivityMonitor monitor = new StompInactivityMonitor(transport,
format);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
Thu Oct 13 14:54:15 2011
@@ -18,15 +18,11 @@ package org.apache.activemq.transport.st
import java.io.IOException;
import java.security.cert.X509Certificate;
-import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.Command;
-import org.apache.activemq.thread.DefaultThreadPools;
-import org.apache.activemq.thread.Task;
-import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
@@ -50,58 +46,18 @@ public class StompTransportFilter extend
private final ProtocolConverter protocolConverter;
private StompInactivityMonitor monitor;
private StompWireFormat wireFormat;
- private final TaskRunner asyncSendTask;
- private final ConcurrentLinkedQueue<Command> asyncCommands = new
ConcurrentLinkedQueue<Command>();
private boolean trace;
- private int maxAsyncBatchSize = 25;
public StompTransportFilter(Transport next, WireFormat wireFormat,
BrokerContext brokerContext) {
super(next);
this.protocolConverter = new ProtocolConverter(this, brokerContext);
- asyncSendTask =
DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
- public boolean iterate() {
- int iterations = 0;
- TransportListener listener = transportListener;
- if (listener != null) {
- while (iterations++ < maxAsyncBatchSize &&
!asyncCommands.isEmpty()) {
- Command command = asyncCommands.poll();
- if (command != null) {
- listener.onCommand(command);
- }
- }
- }
- return !asyncCommands.isEmpty();
- }
-
- }, "ActiveMQ StompTransport Async Worker: " +
System.identityHashCode(this));
-
if (wireFormat instanceof StompWireFormat) {
this.wireFormat = (StompWireFormat) wireFormat;
}
}
- public void stop() throws Exception {
- asyncSendTask.shutdown();
-
- TransportListener listener = transportListener;
- if (listener != null) {
- Command commands[] = new Command[0];
- asyncCommands.toArray(commands);
- asyncCommands.clear();
- for(Command command : commands) {
- try {
- listener.onCommand(command);
- } catch(Exception e) {
- break;
- }
- }
- }
-
- super.stop();
- }
-
public void oneway(Object o) throws IOException {
try {
final Command command = (Command) o;
@@ -132,15 +88,6 @@ public class StompTransportFilter extend
}
}
- public void asyncSendToActiveMQ(Command command) {
- asyncCommands.offer(command);
- try {
- asyncSendTask.wakeup();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
public void sendToStomp(StompFrame command) throws IOException {
if (trace) {
TRACE.trace("Sending: \n" + command);
@@ -183,12 +130,4 @@ public class StompTransportFilter extend
public StompWireFormat getWireFormat() {
return this.wireFormat;
}
-
- public int getMaxAsyncBatchSize() {
- return maxAsyncBatchSize;
- }
-
- public void setMaxAsyncBatchSize(int maxAsyncBatchSize) {
- this.maxAsyncBatchSize = maxAsyncBatchSize;
- }
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Thu Oct 13 14:54:15 2011
@@ -22,8 +22,10 @@ import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -1717,6 +1719,67 @@ public class StompTest extends Combinati
assertEquals("Number of clients", expected, actual);
}
+ public void testDisconnectDoesNotDeadlockBroker() throws Exception {
+ for (int i = 0; i < 20; ++i) {
+ doTestConnectionLeak();
+ }
+ }
+
+ private void doTestConnectionLeak() throws Exception {
+ stompConnect();
+
+ String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ boolean gotMessage = false;
+ boolean gotReceipt = false;
+
+ char[] payload = new char[1024];
+ Arrays.fill(payload, 'A');
+
+ String test = "SEND\n" +
+ "x-type:DEV-3485\n" +
+ "x-uuid:" + UUID.randomUUID() + "\n" +
+ "persistent:true\n" +
+ "receipt:" + UUID.randomUUID() + "\n" +
+ "destination:/queue/test.DEV-3485" +
+ "\n\n" +
+ new String(payload) + Stomp.NULL;
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/test.DEV-3485\n" +
"ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+
+ stompConnection.sendFrame(test);
+
+ // We only want one of them, to trigger the shutdown and potentially
+ // see a deadlock.
+ while (!gotMessage && !gotReceipt) {
+ frame = stompConnection.receiveFrame();
+
+ LOG.debug("Received the frame: " + frame);
+
+ if (frame.startsWith("RECEIPT")) {
+ gotReceipt = true;
+ } else if(frame.startsWith("MESSAGE")) {
+ gotMessage = true;
+ } else {
+ fail("Received a frame that we were not expecting.");
+ }
+ }
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+
+ stompConnection.close();
+ }
+
protected void waitForFrameToTakeEffect() throws InterruptedException {
// bit of a dirty hack :)
// another option would be to force some kind of receipt to be returned
Modified:
activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
---
activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
(original)
+++
activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
Thu Oct 13 14:54:15 2011
@@ -101,9 +101,4 @@ class StompSocket extends TransportSuppo
public StompWireFormat getWireFormat() {
return this.wireFormat;
}
-
- @Override
- public void asyncSendToActiveMQ(Command command) {
- doConsume(command);
- }
}