Author: chirino
Date: Wed Dec 21 11:23:20 2005
New Revision: 358349
URL: http://svn.apache.org/viewcvs?rev=358349&view=rev
Log:
Got the stomp SUBSCRIBE test case working.
Removed:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/AckListener.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Ack.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscribe.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Unsubscribe.java
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java
incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Ack.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Ack.java?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Ack.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Ack.java
Wed Dec 21 11:23:20 2005
@@ -3,17 +3,14 @@
*/
package org.activemq.transport.stomp;
-import org.activemq.command.ActiveMQDestination;
-import org.activemq.command.ActiveMQMessage;
-import org.activemq.command.MessageAck;
-import org.activemq.command.TransactionId;
-
import java.io.DataInput;
import java.io.IOException;
import java.net.ProtocolException;
-import java.util.List;
import java.util.Properties;
+import org.activemq.command.MessageAck;
+import org.activemq.command.TransactionId;
+
class Ack implements StompCommand {
private final StompWireFormat format;
private static final HeaderParser parser = new HeaderParser();
@@ -28,41 +25,22 @@
if (message_id == null)
throw new ProtocolException("ACK received without a message-id to
acknowledge!");
- List listeners = format.getAckListeners();
- for (int i = 0; i < listeners.size(); i++) {
- AckListener listener = (AckListener) listeners.get(i);
- if (listener.handle(message_id)) {
- listeners.remove(i);
- ActiveMQMessage msg = listener.getMessage();
- MessageAck ack = new MessageAck();
- ack.setDestination((ActiveMQDestination)
msg.getJMSDestination());
- ack.setConsumerId(listener.getConsumerId());
- ack.setMessageID(msg.getMessageId());
- ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-
- /*
- * ack.setMessageRead(true);
- * ack.setProducerKey(msg.getProducerKey());
- * ack.setSequenceNumber(msg.getSequenceNumber());
- * ack.setPersistent(msg.getJMSDeliveryMode() ==
- * DeliveryMode.PERSISTENT);
- * ack.setSessionId(format.getSessionId());
- */
-
- if (headers.containsKey(Stomp.Headers.TRANSACTION)) {
- TransactionId tx_id =
format.getTransactionId(headers.getProperty(Stomp.Headers.TRANSACTION));
- if (tx_id == null)
- throw new
ProtocolException(headers.getProperty(Stomp.Headers.TRANSACTION) + " is an
invalid transaction id");
- ack.setTransactionId(tx_id);
- }
-
- while ((in.readByte()) != 0) {
- }
- return new CommandEnvelope(ack, headers);
- }
+ Subscription sub = (Subscription)
format.getDispachedMap().get(message_id);
+ if( sub ==null )
+ throw new ProtocolException("Unexpected ACK received for
message-id [" + message_id + "]");
+
+ MessageAck ack = sub.createMessageAck(message_id);
+
+ if (headers.containsKey(Stomp.Headers.TRANSACTION)) {
+ TransactionId tx_id =
format.getTransactionId(headers.getProperty(Stomp.Headers.TRANSACTION));
+ if (tx_id == null)
+ throw new
ProtocolException(headers.getProperty(Stomp.Headers.TRANSACTION) + " is an
invalid transaction id");
+ ack.setTransactionId(tx_id);
}
+
while ((in.readByte()) != 0) {
}
- throw new ProtocolException("Unexepected ACK received for message-id
[" + message_id + "]");
+
+ return new CommandEnvelope(ack, headers);
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java
Wed Dec 21 11:23:20 2005
@@ -75,6 +75,7 @@
public interface Unsubscribe {
String DESTINATION = "destination";
+ String ID = "id";
}
public interface Connect {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java
Wed Dec 21 11:23:20 2005
@@ -11,6 +11,7 @@
import java.net.ProtocolException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.jms.JMSException;
@@ -19,9 +20,7 @@
import org.activeio.adapter.PacketInputStream;
import org.activeio.command.WireFormat;
import org.activeio.packet.ByteArrayPacket;
-import org.activemq.command.ActiveMQBytesMessage;
import org.activemq.command.ActiveMQDestination;
-import org.activemq.command.ActiveMQTextMessage;
import org.activemq.command.Command;
import org.activemq.command.CommandTypes;
import org.activemq.command.ConnectionId;
@@ -29,11 +28,14 @@
import org.activemq.command.ConsumerId;
import org.activemq.command.FlushCommand;
import org.activemq.command.LocalTransactionId;
+import org.activemq.command.Message;
+import org.activemq.command.MessageDispatch;
import org.activemq.command.MessageId;
import org.activemq.command.ProducerId;
import org.activemq.command.Response;
import org.activemq.command.SessionId;
import org.activemq.command.TransactionId;
+import org.activemq.filter.DestinationMap;
import org.activemq.util.IOExceptionSupport;
import org.activemq.util.IdGenerator;
import org.activemq.util.LongSequenceGenerator;
@@ -53,15 +55,17 @@
private static int transactionIdCounter;
private int version = 1;
- private CommandParser commandParser = new CommandParser(this);
- private HeaderParser headerParser = new HeaderParser();
+ private final CommandParser commandParser = new CommandParser(this);
+ private final HeaderParser headerParser = new HeaderParser();
- private BlockingQueue pendingReadCommands = new LinkedBlockingQueue();
- private BlockingQueue pendingWriteFrames = new LinkedBlockingQueue();
- private List receiptListeners = new CopyOnWriteArrayList();
- private Map subscriptions = new ConcurrentHashMap();
- private List ackListeners = new CopyOnWriteArrayList();
+ private final BlockingQueue pendingReadCommands = new
LinkedBlockingQueue();
+ private final BlockingQueue pendingWriteFrames = new LinkedBlockingQueue();
+ private final List receiptListeners = new CopyOnWriteArrayList();
+ private final Map subscriptionsByConsumerId = new ConcurrentHashMap();
+ private final Map subscriptionsByName = new ConcurrentHashMap();
+ private final DestinationMap subscriptionsByDestination = new
DestinationMap();
private final Map transactions = new ConcurrentHashMap();
+ private final Map dispachedMap = new ConcurrentHashMap();
private short lastCommandId;
private final ConnectionId connectionId = new
ConnectionId(connectionIdGenerator.generateId());
@@ -119,18 +123,11 @@
}
}
}
-
- if (packet.getDataStructureType() ==
CommandTypes.ACTIVEMQ_TEXT_MESSAGE) {
- assert (packet instanceof ActiveMQTextMessage);
- ActiveMQTextMessage msg = (ActiveMQTextMessage) packet;
- Subscription sub = (Subscription)
subscriptions.get(msg.getJMSDestination());
- sub.receive(msg, out);
- }
- else if (packet.getDataStructureType() ==
CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
- assert (packet instanceof ActiveMQBytesMessage);
- ActiveMQBytesMessage msg = (ActiveMQBytesMessage) packet;
- Subscription sub = (Subscription)
subscriptions.get(msg.getJMSDestination());
- sub.receive(msg, out);
+ if( packet.isMessageDispatch() ) {
+ MessageDispatch md = (MessageDispatch)packet;
+ Message message = md.getMessage();
+ Subscription sub = (Subscription)
subscriptionsByConsumerId.get(md.getConsumerId());
+ sub.receive(md, out);
}
return null;
}
@@ -184,17 +181,35 @@
public ProducerId getProducerId() {
return producerId;
}
+
+
+ public Subscription getSubcription(ConsumerId consumerId) {
+ return (Subscription) subscriptionsByConsumerId.get(consumerId);
+ }
+ public Set getSubcriptions(ActiveMQDestination destination) {
+ return subscriptionsByDestination.get(destination);
+ }
+ public Subscription getSubcription(String name) {
+ return (Subscription) subscriptionsByName.get(name);
+ }
public void addSubscription(Subscription s) {
- if (subscriptions.containsKey(s.getDestination())) {
- Subscription old = (Subscription)
subscriptions.get(s.getDestination());
- Command p = old.close();
- enqueueCommand(p);
- subscriptions.put(s.getDestination(), s);
- }
- else {
- subscriptions.put(s.getDestination(), s);
- }
+ if (s.getSubscriptionId()!=null &&
subscriptionsByName.containsKey(s.getSubscriptionId())) {
+ Subscription old = (Subscription)
subscriptionsByName.get(s.getSubscriptionId());
+ removeSubscription(old);
+ enqueueCommand(old.close());
+ }
+ if( s.getSubscriptionId()!=null )
+ subscriptionsByName.put(s.getSubscriptionId(), s);
+ subscriptionsByConsumerId.put(s.getConsumerInfo().getConsumerId(), s);
+ subscriptionsByDestination.put(s.getConsumerInfo().getDestination(),
s);
+ }
+
+ public void removeSubscription(Subscription s) {
+ if( s.getSubscriptionId()!=null )
+ subscriptionsByName.remove(s.getSubscriptionId());
+ subscriptionsByConsumerId.remove(s.getConsumerInfo().getConsumerId());
+
subscriptionsByDestination.remove(s.getConsumerInfo().getDestination(), s);
}
public void enqueueCommand(final Command ack) {
@@ -205,18 +220,6 @@
});
}
- public Subscription getSubscriptionFor(ActiveMQDestination destination) {
- return (Subscription) subscriptions.get(destination);
- }
-
- public void addAckListener(AckListener listener) {
- this.ackListeners.add(listener);
- }
-
- public List getAckListeners() {
- return ackListeners;
- }
-
public TransactionId getTransactionId(String key) {
return (TransactionId) transactions.get(key);
}
@@ -291,6 +294,10 @@
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
}
+ }
+
+ public Map getDispachedMap() {
+ return dispachedMap;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscribe.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscribe.java?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscribe.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscribe.java
Wed Dec 21 11:23:20 2005
@@ -3,14 +3,14 @@
*/
package org.activemq.transport.stomp;
-import org.activemq.command.ActiveMQDestination;
-import org.activemq.command.ConsumerId;
-import org.activemq.command.ConsumerInfo;
-
import java.io.DataInput;
import java.io.IOException;
import java.util.Properties;
+import org.activemq.command.ActiveMQDestination;
+import org.activemq.command.ConsumerInfo;
+import org.activemq.util.IntrospectionSupport;
+
class Subscribe implements StompCommand {
private HeaderParser headerParser = new HeaderParser();
private StompWireFormat format;
@@ -20,19 +20,24 @@
}
public CommandEnvelope build(String commandLine, DataInput in) throws
IOException {
- ConsumerInfo ci = new ConsumerInfo();
Properties headers = headerParser.parse(in);
+
+ String subscriptionId =
headers.getProperty(Stomp.Headers.Subscribe.ID);
String destination =
headers.getProperty(Stomp.Headers.Subscribe.DESTINATION);
+
ActiveMQDestination actual_dest =
DestinationNamer.convert(destination);
+ ConsumerInfo ci = new ConsumerInfo(format.createConsumerId());
+ ci.setPrefetchSize(1000);
+ ci.setDispatchAsync(true);
+
+ IntrospectionSupport.setProperties(ci, headers, "activemq:");
+
ci.setDestination(DestinationNamer.convert(destination));
- ConsumerId consumerId = format.createConsumerId();
- ci.setConsumerId(consumerId);
- ci.setResponseRequired(true);
- // ci.setSessionId(format.getSessionId());
+
while (in.readByte() != 0) {
}
- String subscriptionId =
headers.getProperty(Stomp.Headers.Subscribe.ID, Subscription.NO_ID);
- Subscription s = new Subscription(format, consumerId, subscriptionId);
+
+ Subscription s = new Subscription(format, subscriptionId, ci);
s.setDestination(actual_dest);
String ack_mode_key =
headers.getProperty(Stomp.Headers.Subscribe.ACK_MODE);
if (ack_mode_key != null &&
ack_mode_key.equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT)) {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscription.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscription.java?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscription.java
Wed Dec 21 11:23:20 2005
@@ -3,80 +3,79 @@
*/
package org.activemq.transport.stomp;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import javax.jms.JMSException;
+
import org.activemq.command.ActiveMQBytesMessage;
import org.activemq.command.ActiveMQDestination;
+import org.activemq.command.ActiveMQMessage;
import org.activemq.command.ActiveMQTextMessage;
-import org.activemq.command.ConsumerId;
+import org.activemq.command.ConsumerInfo;
import org.activemq.command.MessageAck;
+import org.activemq.command.MessageDispatch;
import org.activemq.command.RemoveInfo;
-import javax.jms.JMSException;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
public class Subscription {
+
private ActiveMQDestination destination;
private int ackMode = 1;
private StompWireFormat format;
- private final ConsumerId consumerId;
+
private final String subscriptionId;
public static final String NO_ID = "~~ NO SUCH THING [EMAIL PROTECTED]";
-
- public Subscription(StompWireFormat format, ConsumerId consumerId, String
subscriptionId) {
+ private final ConsumerInfo consumerInfo;
+ private final LinkedList dispatchedMessages = new LinkedList();
+
+ public Subscription(StompWireFormat format, String subscriptionId,
ConsumerInfo consumerInfo) {
this.format = format;
- this.consumerId = consumerId;
this.subscriptionId = subscriptionId;
+ this.consumerInfo = consumerInfo;
}
void setDestination(ActiveMQDestination actual_dest) {
this.destination = actual_dest;
}
- void receive(ActiveMQTextMessage msg, DataOutput out) throws IOException,
JMSException {
+ void receive(MessageDispatch md, DataOutput out) throws IOException,
JMSException {
+
+ ActiveMQMessage m = (ActiveMQMessage) md.getMessage();
+
if (ackMode == CLIENT_ACK) {
- AckListener listener = new AckListener(msg, consumerId,
subscriptionId);
- format.addAckListener(listener);
+ Subscription sub = format.getSubcription(md.getConsumerId());
+ sub.addMessageDispatch(md);
+ format.getDispachedMap().put(m.getJMSMessageID(), sub);
}
else if (ackMode == AUTO_ACK) {
- MessageAck ack = new MessageAck();
- // if (format.isInTransaction())
- // ack.setTransactionIDString(format.getTransactionId());
- ack.setDestination(msg.getDestination());
- ack.setConsumerId(consumerId);
- ack.setMessageID(msg.getMessageId());
- ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE,
1);
format.enqueueCommand(ack);
}
- FrameBuilder builder = new
FrameBuilder(Stomp.Responses.MESSAGE).addHeaders(msg).setBody(msg.getText().getBytes());
- if (!subscriptionId.equals(NO_ID)) {
+
+
+ FrameBuilder builder = new FrameBuilder(Stomp.Responses.MESSAGE);
+ builder.addHeaders(m);
+
+ if( m.getDataStructureType() ==
ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) {
+
builder.setBody(((ActiveMQTextMessage)m).getText().getBytes("UTF-8"));
+ } else if( m.getDataStructureType() ==
ActiveMQBytesMessage.DATA_STRUCTURE_TYPE ) {
+ ActiveMQBytesMessage msg = (ActiveMQBytesMessage)m;
+ byte data[] = new byte[(int) msg.getBodyLength()];
+ msg.readBytes(data);
+ builder.setBody(data);
+ }
+
+ if (subscriptionId!=null) {
builder.addHeader(Stomp.Headers.Message.SUBSCRIPTION,
subscriptionId);
}
+
out.write(builder.toFrame());
}
- void receive(ActiveMQBytesMessage msg, DataOutput out) throws IOException,
JMSException {
- // @todo refactor this and the other receive form to remoce duplication
- // -bmc
- if (ackMode == CLIENT_ACK) {
- AckListener listener = new AckListener(msg, consumerId,
subscriptionId);
- format.addAckListener(listener);
- }
- else if (ackMode == AUTO_ACK) {
- MessageAck ack = new MessageAck();
- // if (format.isInTransaction())
- // ack.setTransactionIDString(format.getTransactionId());
- ack.setDestination(msg.getDestination());
- ack.setConsumerId(consumerId);
- ack.setMessageID(msg.getMessageId());
- ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
- format.enqueueCommand(ack);
- }
- FrameBuilder builder = new
FrameBuilder(Stomp.Responses.MESSAGE).addHeaders(msg).setBody(msg.getContent().getData());
- if (!subscriptionId.equals(NO_ID)) {
- builder.addHeader(Stomp.Headers.Message.SUBSCRIPTION,
subscriptionId);
- }
- out.write(builder.toFrame());
+ private void addMessageDispatch(MessageDispatch md) {
+ dispatchedMessages.addLast(md);
}
ActiveMQDestination getDestination() {
@@ -91,8 +90,39 @@
}
public RemoveInfo close() {
- RemoveInfo unsub = new RemoveInfo();
- unsub.setObjectId(consumerId);
- return unsub;
+ return new RemoveInfo(consumerInfo.getConsumerId());
+ }
+
+ public ConsumerInfo getConsumerInfo() {
+ return consumerInfo;
+ }
+
+ public String getSubscriptionId() {
+ return subscriptionId;
+ }
+
+ public MessageAck createMessageAck(String message_id) {
+ MessageAck ack = new MessageAck();
+ ack.setDestination(consumerInfo.getDestination());
+ ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ ack.setConsumerId(consumerInfo.getConsumerId());
+
+ int count=0;
+ for (Iterator iter = dispatchedMessages.iterator(); iter.hasNext();) {
+
+ MessageDispatch md = (MessageDispatch) iter.next();
+ String id = ((ActiveMQMessage)md.getMessage()).getJMSMessageID();
+ if( ack.getFirstMessageId()==null )
+ ack.setFirstMessageId(md.getMessage().getMessageId());
+
+ format.getDispachedMap().remove(id);
+ iter.remove();
+ count++;
+ if( id.equals(message_id) ) {
+ ack.setLastMessageId(md.getMessage().getMessageId());
+ }
+ }
+ ack.setMessageCount(count);
+ return ack;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Unsubscribe.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Unsubscribe.java?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Unsubscribe.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Unsubscribe.java
Wed Dec 21 11:23:20 2005
@@ -7,7 +7,10 @@
import java.io.DataInput;
import java.io.IOException;
+import java.net.ProtocolException;
+import java.util.Iterator;
import java.util.Properties;
+import java.util.Set;
public class Unsubscribe implements StompCommand {
private static final HeaderParser parser = new HeaderParser();
@@ -22,10 +25,25 @@
while (in.readByte() == 0) {
}
- String dest_name =
headers.getProperty(Stomp.Headers.Unsubscribe.DESTINATION);
- ActiveMQDestination destination = DestinationNamer.convert(dest_name);
+ String subscriptionId =
headers.getProperty(Stomp.Headers.Unsubscribe.ID);
+ String destination =
headers.getProperty(Stomp.Headers.Unsubscribe.DESTINATION);
+
+
+ if( subscriptionId!=null ) {
+ Subscription s = format.getSubcription(subscriptionId);
+ format.removeSubscription(s);
+ return new CommandEnvelope(s.close(), headers);
+ }
+
+ ActiveMQDestination d = DestinationNamer.convert(destination);
+ Set subs = format.getSubcriptions(d);
+ for (Iterator iter = subs.iterator(); iter.hasNext();) {
+ Subscription s = (Subscription) iter.next();
+ format.removeSubscription(s);
+ return new CommandEnvelope(s.close(), headers);
+ }
+
+ throw new ProtocolException("Unexpected UNSUBSCRIBE received.");
- Subscription s = format.getSubscriptionFor(destination);
- return new CommandEnvelope(s.close(), headers);
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java
Wed Dec 21 11:23:20 2005
@@ -8,7 +8,6 @@
import java.net.URI;
import javax.jms.Connection;
-import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -18,7 +17,6 @@
import org.activemq.CombinationTestSupport;
import org.activemq.broker.BrokerService;
import org.activemq.broker.TransportConnector;
-import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQQueue;
public class StompTest extends CombinationTestSupport {
@@ -115,6 +113,34 @@
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
assertEquals("Hello World", message.getText());
+
+ }
+
+ public void testSubscribeWithAutoAck() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n"+
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000000);
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/TEST\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage(getName());
+ producer.send(message);
+
+ frame = receiveFrame(10000);
+ assertTrue(frame.startsWith("MESSAGE"));
}
Modified:
incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties
Wed Dec 21 11:23:20 2005
@@ -1,7 +1,7 @@
#
# The logging properties used during tests..
#
-log4j.rootLogger=INFO, out
+log4j.rootLogger=DEBUG, stdout
log4j.logger.org.activemq.spring=WARN