Author: dejanb
Date: Mon Feb 8 10:46:17 2010
New Revision: 907613
URL: http://svn.apache.org/viewvc?rev=907613&view=rev
Log:
merging 906071,906300,906301 -
https://issues.apache.org/activemq/browse/AMQ-2490 - JMSXUserID not propogated
to STOMP consumer
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSslAuthTest.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java?rev=907613&r1=907612&r2=907613&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
Mon Feb 8 10:46:17 2010
@@ -74,6 +74,10 @@
headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
}
+ if (message.getUserID() != null) {
+ headers.put(Stomp.Headers.Message.USERID, message.getUserID());
+ }
+
// now lets add all the message headers
final Map<String, Object> properties = message.getProperties();
if (properties != null) {
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java?rev=907613&r1=907612&r2=907613&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
Mon Feb 8 10:46:17 2010
@@ -76,6 +76,7 @@
String TIMESTAMP = "timestamp";
String TYPE = "type";
String SUBSCRIPTION = "subscription";
+ String USERID = "JMSXUserID";
}
public interface Subscribe {
Modified:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSslAuthTest.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSslAuthTest.java?rev=907613&r1=907612&r2=907613&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSslAuthTest.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSslAuthTest.java
Mon Feb 8 10:46:17 2010
@@ -30,20 +30,20 @@
*/
public class StompSslAuthTest extends StompTest {
-
+
protected void setUp() throws Exception {
-
- // Test mutual authentication on both stomp and standard ssl transports
- bindAddress = "stomp+ssl://localhost:61612";
+
+ // Test mutual authentication on both stomp and standard ssl transports
+ bindAddress = "stomp+ssl://localhost:61612";
confUri =
"xbean:org/apache/activemq/transport/stomp/sslstomp-mutual-auth-broker.xml";
jmsUri="ssl://localhost:61617";
-
+
System.setProperty("javax.net.ssl.trustStore",
"src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore",
"src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
- System.setProperty("javax.net.ssl.keyStoreType", "jks");
+ System.setProperty("javax.net.ssl.keyStoreType", "jks");
//System.setProperty("javax.net.debug","ssl,handshake");
super.setUp();
}
@@ -52,18 +52,23 @@
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", connectUri.getPort());
}
-
+
// NOOP - These operations handled by jaas cert login module
public void testConnectNotAuthenticatedWrongUser() throws Exception {
}
-
+
public void testConnectNotAuthenticatedWrongPassword() throws Exception {
}
-
+
public void testSendNotAuthorized() throws Exception {
}
-
+
public void testSubscribeNotAuthorized() throws Exception {
}
-
+
+ public void testJMSXUserIDIsSetInMessage() throws Exception {
+ }
+
+ public void testJMSXUserIDIsSetInStompMessage() throws Exception {
+ }
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=907613&r1=907612&r2=907613&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Mon Feb 8 10:46:17 2010
@@ -58,53 +58,53 @@
private Connection connection;
private Session session;
private ActiveMQQueue queue;
- private String xmlObject = "<pojo>\n"
+ private String xmlObject = "<pojo>\n"
+ " <name>Dejan</name>\n"
- + " <city>Belgrade</city>\n"
+ + " <city>Belgrade</city>\n"
+ "</pojo>";
- private String xmlMap = "<map>\n"
+ private String xmlMap = "<map>\n"
+ " <entry>\n"
- + " <string>name</string>\n"
+ + " <string>name</string>\n"
+ " <string>Dejan</string>\n"
- + " </entry>\n"
- + " <entry>\n"
+ + " </entry>\n"
+ + " <entry>\n"
+ " <string>city</string>\n"
- + " <string>Belgrade</string>\n"
- + " </entry>\n"
+ + " <string>Belgrade</string>\n"
+ + " </entry>\n"
+ "</map>\n";
- private String jsonObject = "{\"pojo\":{"
+ private String jsonObject = "{\"pojo\":{"
+ "\"name\":\"Dejan\","
- + "\"city\":\"Belgrade\""
+ + "\"city\":\"Belgrade\""
+ "}}";
- private String jsonMap = "{\"map\":{"
+ private String jsonMap = "{\"map\":{"
+ "\"entry\":["
+ "{\"string\":[\"name\",\"Dejan\"]},"
- + "{\"string\":[\"city\",\"Belgrade\"]}"
- + "]"
+ + "{\"string\":[\"city\",\"Belgrade\"]}"
+ + "]"
+ "}}";
protected void setUp() throws Exception {
// The order of the entries is different when using ibm jdk 5.
if (System.getProperty("java.vendor").equals("IBM Corporation")
&& System.getProperty("java.version").startsWith("1.5")) {
- xmlMap = "<map>\n"
- + " <entry>\n"
+ xmlMap = "<map>\n"
+ + " <entry>\n"
+ " <string>city</string>\n"
- + " <string>Belgrade</string>\n"
- + " </entry>\n"
+ + " <string>Belgrade</string>\n"
+ + " </entry>\n"
+ " <entry>\n"
- + " <string>name</string>\n"
+ + " <string>name</string>\n"
+ " <string>Dejan</string>\n"
- + " </entry>\n"
+ + " </entry>\n"
+ "</map>\n";
- jsonMap = "{\"map\":{"
+ jsonMap = "{\"map\":{"
+ "\"entry\":["
- + "{\"string\":[\"city\",\"Belgrade\"]},"
+ + "{\"string\":[\"city\",\"Belgrade\"]},"
+ "{\"string\":[\"name\",\"Dejan\"]}"
- + "]"
+ + "]"
+ "}}";
}
broker = BrokerFactory.createBroker(new URI(confUri));
@@ -133,14 +133,14 @@
}
protected void tearDown() throws Exception {
- try {
- connection.close();
- stompDisconnect();
- } catch(Exception e) {
- // Some tests explicitly disconnect from stomp so can ignore
- } finally {
- broker.stop();
- }
+ try {
+ connection.close();
+ stompDisconnect();
+ } catch(Exception e) {
+ // Some tests explicitly disconnect from stomp so can ignore
+ } finally {
+ broker.stop();
+ }
}
private void stompDisconnect() throws IOException {
@@ -470,9 +470,7 @@
LOG.info("Received frame: " + frame);
fail("No message should have been received since subscription was
removed");
} catch (SocketTimeoutException e) {
-
}
-
}
public void testTransactionCommit() throws Exception {
@@ -554,29 +552,28 @@
assertClients(1);
}
-
+
public void testConnectNotAuthenticatedWrongUser() throws Exception {
String frame = "CONNECT\n" + "login: dejanb\n" + "passcode:
manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String f = stompConnection.receiveFrame();
-
- assertTrue(f.startsWith("ERROR"));
+
+ assertTrue(f.startsWith("ERROR"));
assertClients(1);
-
}
-
+
public void testConnectNotAuthenticatedWrongPassword() throws Exception {
-
+
String frame = "CONNECT\n" + "login: system\n" + "passcode:
dejanb\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String f = stompConnection.receiveFrame();
-
- assertTrue(f.startsWith("ERROR"));
- assertClients(1);
- }
-
+
+ assertTrue(f.startsWith("ERROR"));
+ assertClients(1);
+ }
+
public void testSendNotAuthorized() throws Exception {
String frame = "CONNECT\n" + "login: guest\n" + "passcode:
password\n\n" + Stomp.NULL;
@@ -590,9 +587,8 @@
stompConnection.sendFrame(frame);
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("ERROR"));
-
}
-
+
public void testSubscribeNotAuthorized() throws Exception {
String frame = "CONNECT\n" + "login: guest\n" + "passcode:
password\n\n" + Stomp.NULL;
@@ -607,8 +603,8 @@
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("ERROR"));
- }
-
+ }
+
public void testTransformationUnknownTranslator() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@@ -624,9 +620,9 @@
TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message);
- assertEquals("Hello World", message.getText());
+ assertEquals("Hello World", message.getText());
}
-
+
public void testTransformationFailed() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@@ -643,9 +639,9 @@
TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message);
assertNotNull(message.getStringProperty(Stomp.Headers.TRANSFORMATION_ERROR));
- assertEquals("Hello World", message.getText());
+ assertEquals("Hello World", message.getText());
}
-
+
public void testTransformationSendXMLObject() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@@ -654,7 +650,7 @@
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
-
+
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
"transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + xmlObject +
Stomp.NULL;
stompConnection.sendFrame(frame);
@@ -663,8 +659,8 @@
assertNotNull(message);
SamplePojo object = (SamplePojo)message.getObject();
assertEquals("Dejan", object.getName());
- }
-
+ }
+
public void testTransformationSendJSONObject() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@@ -673,7 +669,7 @@
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
-
+
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
"transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + jsonObject
+ Stomp.NULL;
stompConnection.sendFrame(frame);
@@ -683,13 +679,13 @@
SamplePojo object = (SamplePojo)message.getObject();
assertEquals("Dejan", object.getName());
}
-
+
public void testTransformationSubscribeXML() throws Exception {
-
+
MessageProducer producer = session.createProducer(new
ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new
SamplePojo("Dejan", "Belgrade"));
producer.send(message);
-
+
String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@@ -698,7 +694,7 @@
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() +
"\n" + "ack:auto" + "\n" + "transformation:" +
Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
-
+
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject));
@@ -706,12 +702,12 @@
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
-
+
public void testTransformationReceiveJSONObject() throws Exception {
MessageProducer producer = session.createProducer(new
ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new
SamplePojo("Dejan", "Belgrade"));
producer.send(message);
-
+
String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@@ -720,21 +716,21 @@
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() +
"\n" + "ack:auto" + "\n" + "transformation:" +
Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
-
+
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(jsonObject));
-
+
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- stompConnection.sendFrame(frame);
+ stompConnection.sendFrame(frame);
}
-
+
public void testTransformationReceiveXMLObject() throws Exception {
-
+
MessageProducer producer = session.createProducer(new
ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new
SamplePojo("Dejan", "Belgrade"));
producer.send(message);
-
+
String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@@ -743,15 +739,15 @@
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() +
"\n" + "ack:auto" + "\n" + "transformation:" +
Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
-
+
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject));
-
+
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
- }
-
+ }
+
public void testTransformationNotOverrideSubscription() throws Exception {
MessageProducer producer = session.createProducer(new
ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new
SamplePojo("Dejan", "Belgrade"));
@@ -866,7 +862,7 @@
message.setString("name", "Dejan");
message.setString("city", "Belgrade");
producer.send(message);
-
+
String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@@ -875,81 +871,81 @@
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() +
"\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON +
"\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
-
+
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(jsonMap.trim()));
-
+
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- stompConnection.sendFrame(frame);
+ stompConnection.sendFrame(frame);
}
-
+
public void testDurableUnsub() throws Exception {
- // get broker JMX view
-
+ // get broker JMX view
+
String domain = "org.apache.activemq";
ObjectName brokerName = new ObjectName(domain +
":Type=Broker,BrokerName=localhost");
-
- BrokerViewMBean view =
(BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName,
BrokerViewMBean.class, true);
-
- // connect
+
+ BrokerViewMBean view =
(BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName,
BrokerViewMBean.class, true);
+
+ // connect
String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
assertEquals(view.getDurableTopicSubscribers().length, 0);
-
+
// subscribe
frame = "SUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n"
+ "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// wait a bit for MBean to get refreshed
try {
- Thread.sleep(400);
+ Thread.sleep(400);
} catch (InterruptedException e){}
-
+
assertEquals(view.getDurableTopicSubscribers().length, 1);
// disconnect
frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
try {
- Thread.sleep(400);
+ Thread.sleep(400);
} catch (InterruptedException e){}
-
+
//reconnect
stompConnect();
- // connect
+ // connect
frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\nclient-id:test\n\n" + Stomp.NULL;
- stompConnection.sendFrame(frame);
+ stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
-
+
// unsubscribe
frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() +
"\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- stompConnection.sendFrame(frame);
+ stompConnection.sendFrame(frame);
try {
- Thread.sleep(400);
+ Thread.sleep(400);
} catch (InterruptedException e){}
assertEquals(view.getDurableTopicSubscribers().length, 0);
}
-
- public void testMessageIdHeader() throws Exception {
+
+ public void testMessageIdHeader() throws Exception {
stompConnection.connect("system", "manager");
-
+
stompConnection.begin("tx1");
stompConnection.send("/queue/" + getQueueName(), "msg", "tx1", null);
stompConnection.commit("tx1");
-
+
stompConnection.subscribe("/queue/" + getQueueName());
StompFrame stompMessage = stompConnection.receive();
- assertNull(stompMessage.getHeaders().get("transaction"));
+ assertNull(stompMessage.getHeaders().get("transaction"));
}
-
+
public void testPrefetchSize() throws Exception {
stompConnection.connect("system", "manager");
-
+
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("activemq.prefetchSize", "1");
stompConnection.subscribe("/queue/" + getQueueName(), "client",
headers);
@@ -960,85 +956,80 @@
sendMessage("message 3");
sendMessage("message 4");
sendMessage("message 5");
-
-
StompFrame frame = stompConnection.receive();
assertEquals(frame.getBody(), "message 1");
-
+
stompConnection.begin("tx1");
stompConnection.ack(frame, "tx1");
StompFrame frame1 = stompConnection.receive();
assertEquals(frame1.getBody(), "message 2");
-
+
try {
- StompFrame frame2 = stompConnection.receive(500);
- if (frame2 != null) {
- fail("Should not have received the second message");
- }
+ StompFrame frame2 = stompConnection.receive(500);
+ if (frame2 != null) {
+ fail("Should not have received the second message");
+ }
} catch (SocketTimeoutException soe) {}
-
+
stompConnection.ack(frame1, "tx1");
Thread.sleep(1000);
stompConnection.abort("tx1");
-
+
stompConnection.begin("tx2");
-
+
// Previously delivered message need to get re-acked...
stompConnection.ack(frame, "tx2");
stompConnection.ack(frame1, "tx2");
-
+
StompFrame frame3 = stompConnection.receive();
assertEquals(frame3.getBody(), "message 3");
stompConnection.ack(frame3, "tx2");
-
+
StompFrame frame4 = stompConnection.receive();
assertEquals(frame4.getBody(), "message 4");
stompConnection.ack(frame4, "tx2");
-
+
stompConnection.commit("tx2");
-
+
stompConnection.begin("tx3");
StompFrame frame5 = stompConnection.receive();
assertEquals(frame5.getBody(), "message 5");
stompConnection.ack(frame5, "tx3");
stompConnection.commit("tx3");
-
+
stompDisconnect();
-
}
-
+
public void testTransactionsWithMultipleDestinations() throws Exception {
- stompConnection.connect("system", "manager");
-
+ stompConnection.connect("system", "manager");
+
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("activemq.prefetchSize", "1");
headers.put("activemq.exclusive", "true");
-
- stompConnection.subscribe("/queue/test1", "client", headers);
-
- stompConnection.begin("ID:tx1");
-
- headers.clear();
- headers.put("receipt", "ID:msg1");
- stompConnection.send("/queue/test2", "test message", "ID:tx1", headers);
-
- stompConnection.commit("ID:tx1");
-
- // make sure connection is active after commit
- Thread.sleep(1000);
- stompConnection.send("/queue/test1", "another message");
-
- StompFrame frame = stompConnection.receive(500);
- System.out.println(frame);
- assertNotNull(frame);
-
-
- stompConnection.disconnect();
+
+ stompConnection.subscribe("/queue/test1", "client", headers);
+
+ stompConnection.begin("ID:tx1");
+
+ headers.clear();
+ headers.put("receipt", "ID:msg1");
+ stompConnection.send("/queue/test2", "test message", "ID:tx1",
headers);
+
+ stompConnection.commit("ID:tx1");
+
+ // make sure connection is active after commit
+ Thread.sleep(1000);
+ stompConnection.send("/queue/test1", "another message");
+
+ StompFrame frame = stompConnection.receive(500);
+ assertNotNull(frame);
+
+ stompConnection.disconnect();
}
-
+
public void testTempDestination() throws Exception {
String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;
@@ -1046,7 +1037,7 @@
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
-
+
frame = "SUBSCRIBE\n" + "destination:/temp-queue/" + getQueueName() +
"\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@@ -1056,7 +1047,45 @@
StompFrame message = stompConnection.receive(1000);
assertEquals("Hello World", message.getBody());
}
-
+
+ public void testJMSXUserIDIsSetInMessage() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" +
"Hello World" + Stomp.NULL;
+
+ stompConnection.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals("system",
message.getStringProperty(Stomp.Headers.Message.USERID));
+
+ }
+
+ public void testJMSXUserIDIsSetInStompMessage() throws Exception {
+
+ String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n"
+ "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" +
"Hello World" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame message = stompConnection.receive(5000);
+ assertEquals("system",
message.getHeaders().get(Stomp.Headers.Message.USERID));
+ }
+
protected void assertClients(int expected) throws Exception {
org.apache.activemq.broker.Connection[] clients =
broker.getBroker().getClients();
int actual = clients.length;
@@ -1071,5 +1100,3 @@
Thread.sleep(2000);
}
}
-
-