Author: dejanb
Date: Mon Aug 17 11:43:34 2009
New Revision: 804943
URL: http://svn.apache.org/viewvc?rev=804943&view=rev
Log:
additional fix for https://issues.apache.org/activemq/browse/AMQ-1807 - fixing
test case and making it work
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Aug 17 11:43:34 2009
@@ -236,7 +236,6 @@
dequeueCounter++;
dispatched.remove(node);
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
- prefetchExtension--;
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Mon Aug 17 11:43:34 2009
@@ -361,11 +361,7 @@
}
for (Iterator<StompSubscription> iter =
subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next();
- try {
- sub.onStompAbort(activemqTx);
- } catch (Exception e) {
- throw new ProtocolException("Transaction abort failed", false,
e);
- }
+ sub.onStompAbort(activemqTx);
}
TransactionInfo tx = new TransactionInfo();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
Mon Aug 17 11:43:34 2009
@@ -188,15 +188,6 @@
}
public void abort(String transaction) throws Exception {
- // discard all content on the wire before
- // aborting the transaction
- try {
- StompFrame discarded = this.receive(100);
- while (discarded != null) {
- discarded = this.receive(100);
- }
- } catch (Exception e) {
- }
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("transaction", transaction);
StompFrame frame = new StompFrame("ABORT", headers);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Mon Aug 17 11:43:34 2009
@@ -99,43 +99,20 @@
protocolConverter.getTransportFilter().sendToStomp(command);
}
- synchronized void onStompAbort(TransactionId transactionId) throws
IOException, JMSException {
- //ack all unacked messages
- for (MessageDispatch md : dispatchedMessage.values()) {
- if (!unconsumedMessage.contains(md)) {
- MessageAck ack = new MessageAck();
- ack.setDestination(consumerInfo.getDestination());
- ack.setConsumerId(consumerInfo.getConsumerId());
- ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
- ack.setFirstMessageId(md.getMessage().getMessageId());
- ack.setLastMessageId(md.getMessage().getMessageId());
- ack.setMessageCount(1);
- ack.setTransactionId(transactionId);
- protocolConverter.getTransportFilter().sendToActiveMQ(ack);
- unconsumedMessage.add(md);
- }
- }
- // redeliver all unconsumed messages
- for (MessageDispatch md : unconsumedMessage) {
- onMessageDispatch(md);
- }
+ synchronized void onStompAbort(TransactionId transactionId) {
+ unconsumedMessage.clear();
}
synchronized void onStompCommit(TransactionId transactionId) {
- // ack all messages
- if (!unconsumedMessage.isEmpty()) {
- MessageAck ack = new MessageAck();
- ack.setDestination(consumerInfo.getDestination());
- ack.setConsumerId(consumerInfo.getConsumerId());
- ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-
ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
-
ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
- ack.setMessageCount(unconsumedMessage.size());
- ack.setTransactionId(transactionId);
- protocolConverter.getTransportFilter().sendToActiveMQ(ack);
- // clear lists
- unconsumedMessage.clear();
+ for (Iterator iter = dispatchedMessage.entrySet().iterator();
iter.hasNext();) {
+ Map.Entry entry = (Entry)iter.next();
+ MessageId id = (MessageId)entry.getKey();
+ MessageDispatch msg = (MessageDispatch)entry.getValue();
+ if (unconsumedMessage.contains(msg)) {
+ iter.remove();
+ }
}
+ unconsumedMessage.clear();
}
synchronized MessageAck onStompMessageAck(String messageId, TransactionId
transactionId) {
@@ -151,11 +128,7 @@
ack.setConsumerId(consumerInfo.getConsumerId());
if (ackMode == CLIENT_ACK) {
- if (transactionId != null) {
- ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
- } else {
- ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
- }
+ ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
int count = 0;
for (Iterator iter = dispatchedMessage.entrySet().iterator();
iter.hasNext();) {
@@ -168,10 +141,12 @@
}
if (transactionId != null) {
- if (!unconsumedMessage.contains(msg))
+ if (!unconsumedMessage.contains(msg)) {
unconsumedMessage.add(msg);
+ }
+ } else {
+ iter.remove();
}
- iter.remove();
count++;
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Mon Aug 17 11:43:34 2009
@@ -977,24 +977,24 @@
stompConnection.begin("tx2");
+ // Previously delivered message need to get re-acked...
+ stompConnection.ack(frame, "tx2");
+ stompConnection.ack(frame1, "tx2");
+
StompFrame frame3 = stompConnection.receive();
- assertEquals(frame3.getBody(), "message 1");
+ assertEquals(frame3.getBody(), "message 3");
stompConnection.ack(frame3, "tx2");
StompFrame frame4 = stompConnection.receive();
- assertEquals(frame4.getBody(), "message 2");
+ assertEquals(frame4.getBody(), "message 4");
stompConnection.ack(frame4, "tx2");
- StompFrame frame5 = stompConnection.receive();
- assertEquals(frame5.getBody(), "message 3");
- stompConnection.ack(frame5, "tx2");
-
stompConnection.commit("tx2");
stompConnection.begin("tx3");
- StompFrame frame6 = stompConnection.receive();
- assertEquals(frame6.getBody(), "message 4");
- stompConnection.ack(frame6, "tx3");
+ StompFrame frame5 = stompConnection.receive();
+ assertEquals(frame5.getBody(), "message 5");
+ stompConnection.ack(frame5, "tx3");
stompConnection.commit("tx3");
stompDisconnect();