Author: rajdavies
Date: Thu Sep 4 11:21:46 2008
New Revision: 692183
URL: http://svn.apache.org/viewvc?rev=692183&view=rev
Log:
Patch applied to https://issues.apache.org/activemq/browse/AMQ-1874
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=692183&r1=692182&r2=692183&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Thu Sep 4 11:21:46 2008
@@ -389,6 +389,8 @@
String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
+ } else if
(Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
+ stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
} else {
stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java?rev=692183&r1=692182&r2=692183&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
Thu Sep 4 11:21:46 2008
@@ -87,6 +87,7 @@
public interface AckModeValues {
String AUTO = "auto";
String CLIENT = "client";
+ String INDIVIDUAL = "client-individual";
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=692183&r1=692182&r2=692183&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Thu Sep 4 11:21:46 2008
@@ -40,6 +40,7 @@
public static final String AUTO_ACK =
Stomp.Headers.Subscribe.AckModeValues.AUTO;
public static final String CLIENT_ACK =
Stomp.Headers.Subscribe.AckModeValues.CLIENT;
+ public static final String INDIVIDUAL_ACK =
Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
private final ProtocolConverter protocolConverter;
private final String subscriptionId;
@@ -66,6 +67,10 @@
synchronized (this) {
dispatchedMessage.put(message.getJMSMessageID(),
message.getMessageId());
}
+ } else if (ackMode == INDIVIDUAL_ACK) {
+ synchronized (this) {
+ dispatchedMessage.put(message.getJMSMessageID(),
message.getMessageId());
+ }
} else if (ackMode == AUTO_ACK) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE,
1);
protocolConverter.getTransportFilter().sendToActiveMQ(ack);
@@ -99,31 +104,38 @@
MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination());
- ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setConsumerId(consumerInfo.getConsumerId());
- int count = 0;
- for (Iterator iter = dispatchedMessage.entrySet().iterator();
iter.hasNext();) {
-
- Map.Entry entry = (Entry)iter.next();
- String id = (String)entry.getKey();
- MessageId msgid = (MessageId)entry.getValue();
-
- if (ack.getFirstMessageId() == null) {
- ack.setFirstMessageId(msgid);
- }
-
- iter.remove();
- count++;
+ if (ackMode == CLIENT_ACK) {
+ ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ int count = 0;
+ for (Iterator iter = dispatchedMessage.entrySet().iterator();
iter.hasNext();) {
+
+ Map.Entry entry = (Entry)iter.next();
+ String id = (String)entry.getKey();
+ MessageId msgid = (MessageId)entry.getValue();
+
+ if (ack.getFirstMessageId() == null) {
+ ack.setFirstMessageId(msgid);
+ }
+
+ iter.remove();
+ count++;
+
+ if (id.equals(messageId)) {
+ ack.setLastMessageId(msgid);
+ break;
+ }
- if (id.equals(messageId)) {
- ack.setLastMessageId(msgid);
- break;
}
-
+ ack.setMessageCount(count);
+ }
+ else if (ackMode == INDIVIDUAL_ACK) {
+ ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
+ MessageId msgid = (MessageId)dispatchedMessage.get(messageId);
+ ack.setMessageID(msgid);
+ dispatchedMessage.remove(messageId);
}
-
- ack.setMessageCount(count);
return ack;
}