Author: tabish
Date: Tue Oct 11 19:39:45 2011
New Revision: 1182049
URL: http://svn.apache.org/viewvc?rev=1182049&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3481
Further refine this fix to address some test failures.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1182049&r1=1182048&r2=1182049&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Tue Oct 11 19:39:45 2011
@@ -166,7 +166,16 @@ public class ProtocolConverter {
command.setResponseRequired(true);
resposeHandlers.put(Integer.valueOf(command.getCommandId()),
handler);
}
- stompTransport.sendToActiveMQ(command);
+ stompTransport.asyncSendToActiveMQ(command);
+ }
+
+ protected void asyncSendToActiveMQ(Command command, ResponseHandler
handler) {
+ command.setCommandId(generateCommandId());
+ if (handler != null) {
+ command.setResponseRequired(true);
+ resposeHandlers.put(Integer.valueOf(command.getCommandId()),
handler);
+ }
+ stompTransport.asyncSendToActiveMQ(command);
}
protected void sendToStomp(StompFrame command) throws IOException {
@@ -292,7 +301,7 @@ public class ProtocolConverter {
}
message.onSend();
- sendToActiveMQ(message, createResponseHandler(command));
+ asyncSendToActiveMQ(message, createResponseHandler(command));
}
protected void onStompNack(StompFrame command) throws ProtocolException {
@@ -329,7 +338,7 @@ public class ProtocolConverter {
if (sub != null) {
MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
if (ack != null) {
- sendToActiveMQ(ack, createResponseHandler(command));
+ asyncSendToActiveMQ(ack, createResponseHandler(command));
} else {
throw new ProtocolException("Unexpected NACK received for
message-id [" + messageId + "]");
}
@@ -368,7 +377,7 @@ public class ProtocolConverter {
if (sub != null) {
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
if (ack != null) {
- sendToActiveMQ(ack, createResponseHandler(command));
+ asyncSendToActiveMQ(ack, createResponseHandler(command));
acked = true;
}
}
@@ -382,7 +391,7 @@ public class ProtocolConverter {
for (StompSubscription sub : subscriptionsByConsumerId.values()) {
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
if (ack != null) {
- sendToActiveMQ(ack, createResponseHandler(command));
+ asyncSendToActiveMQ(ack, createResponseHandler(command));
acked = true;
break;
}
@@ -417,7 +426,7 @@ public class ProtocolConverter {
tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.BEGIN);
- sendToActiveMQ(tx, createResponseHandler(command));
+ asyncSendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompCommit(StompFrame command) throws ProtocolException {
@@ -444,7 +453,7 @@ public class ProtocolConverter {
tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
- sendToActiveMQ(tx, createResponseHandler(command));
+ asyncSendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompAbort(StompFrame command) throws ProtocolException {
@@ -473,7 +482,7 @@ public class ProtocolConverter {
tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.ROLLBACK);
- sendToActiveMQ(tx, createResponseHandler(command));
+ asyncSendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompSubscribe(StompFrame command) throws
ProtocolException {
@@ -541,7 +550,7 @@ public class ProtocolConverter {
// dispatch can beat the receipt so send it early
sendReceipt(command);
- sendToActiveMQ(consumerInfo, null);
+ asyncSendToActiveMQ(consumerInfo, null);
}
protected void onStompUnsubscribe(StompFrame command) throws
ProtocolException {
@@ -570,7 +579,7 @@ public class ProtocolConverter {
info.setClientId(durable);
info.setSubscriptionName(durable);
info.setConnectionId(connectionId);
- sendToActiveMQ(info, createResponseHandler(command));
+ asyncSendToActiveMQ(info, createResponseHandler(command));
return;
}
@@ -578,7 +587,7 @@ public class ProtocolConverter {
StompSubscription sub = this.subscriptions.remove(subscriptionId);
if (sub != null) {
- sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(),
createResponseHandler(command));
+
asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(),
createResponseHandler(command));
return;
}
@@ -589,7 +598,7 @@ public class ProtocolConverter {
for (Iterator<StompSubscription> iter =
subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next();
if (destination != null &&
destination.equals(sub.getDestination())) {
-
sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(),
createResponseHandler(command));
+
asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(),
createResponseHandler(command));
iter.remove();
return;
}
@@ -712,8 +721,8 @@ public class ProtocolConverter {
protected void onStompDisconnect(StompFrame command) throws
ProtocolException {
checkConnected();
- sendToActiveMQ(connectionInfo.createRemoveCommand(),
createResponseHandler(command));
- sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
+ asyncSendToActiveMQ(connectionInfo.createRemoveCommand(),
createResponseHandler(command));
+ asyncSendToActiveMQ(new ShutdownInfo(),
createResponseHandler(command));
connected.set(false);
}
@@ -778,7 +787,7 @@ public class ProtocolConverter {
ActiveMQDestination rc = tempDestinations.get(name);
if( rc == null ) {
rc = new ActiveMQTempQueue(connectionId,
tempDestinationGenerator.getNextSequenceId());
- sendToActiveMQ(new DestinationInfo(connectionId,
DestinationInfo.ADD_OPERATION_TYPE, rc), null);
+ asyncSendToActiveMQ(new DestinationInfo(connectionId,
DestinationInfo.ADD_OPERATION_TYPE, rc), null);
tempDestinations.put(name, rc);
}
return rc;
@@ -788,7 +797,7 @@ public class ProtocolConverter {
ActiveMQDestination rc = tempDestinations.get(name);
if( rc == null ) {
rc = new ActiveMQTempTopic(connectionId,
tempDestinationGenerator.getNextSequenceId());
- sendToActiveMQ(new DestinationInfo(connectionId,
DestinationInfo.ADD_OPERATION_TYPE, rc), null);
+ asyncSendToActiveMQ(new DestinationInfo(connectionId,
DestinationInfo.ADD_OPERATION_TYPE, rc), null);
tempDestinations.put(name, rc);
tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=1182049&r1=1182048&r2=1182049&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Tue Oct 11 19:39:45 2011
@@ -115,7 +115,7 @@ public class StompSubscription {
if (!unconsumedMessage.isEmpty()) {
MessageAck ack = new MessageAck(unconsumedMessage.getLast(),
MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
- protocolConverter.getStompTransport().sendToActiveMQ(ack);
+ protocolConverter.getStompTransport().asyncSendToActiveMQ(ack);
unconsumedMessage.clear();
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1182049&r1=1182048&r2=1182049&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Tue Oct 11 19:39:45 2011
@@ -1588,7 +1588,6 @@ public class StompTest extends Combinati
stompConnection.connect("system", "manager");
HashMap<String, String> headers = new HashMap<String, String>();
- long timestamp = System.currentTimeMillis();
headers.put(Stomp.Headers.Send.REPLY_TO, "JustAString");
headers.put(Stomp.Headers.Send.PERSISTENT, "true");