http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java index 407df80..d4158ac 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java @@ -32,8 +32,9 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; @@ -46,28 +47,28 @@ import org.junit.Test; /* * */ -public class StompV11Test extends StompV11TestBase { +public class StompV11Test extends StompTestBase { private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER; public static final String CLIENT_ID = "myclientid"; - private StompClientConnection connV11; + private StompClientConnection conn; @Override @Before public void setUp() throws Exception { super.setUp(); - connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); + conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); } @Override @After public void tearDown() throws Exception { try { - boolean connected = connV11 != null && connV11.isConnected(); - log.debug("Connection 11 : " + connected); + boolean connected = conn != null && conn.isConnected(); + log.debug("Connection 1.1 : " + connected); if (connected) { - connV11.disconnect(); + conn.disconnect(); } } finally { super.tearDown(); @@ -115,276 +116,234 @@ public class StompV11Test extends StompV11TestBase { conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection("1.1", hostname, port); ClientStompFrame frame = conn.connect("invaliduser", defPass); assertFalse(conn.isConnected()); - assertTrue("ERROR".equals(frame.getCommand())); + assertTrue(Stomp.Responses.ERROR.equals(frame.getCommand())); assertTrue(frame.getBody().contains("Security Error occurred")); } @Test public void testNegotiation() throws Exception { // case 1 accept-version absent. It is a 1.0 connect - ClientStompFrame frame = connV11.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); - ClientStompFrame reply = connV11.sendFrame(frame); + ClientStompFrame reply = conn.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); //reply headers: version, session, server assertEquals(null, reply.getHeader("version")); - connV11.disconnect(); + conn.disconnect(); // case 2 accept-version=1.0, result: 1.0 - connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - frame = connV11.createFrame("CONNECT"); - frame.addHeader("accept-version", "1.0"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); + conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); + frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0") + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); - reply = connV11.sendFrame(frame); + reply = conn.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); //reply headers: version, session, server assertEquals("1.0", reply.getHeader("version")); - connV11.disconnect(); + conn.disconnect(); // case 3 accept-version=1.1, result: 1.1 - connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - frame = connV11.createFrame("CONNECT"); - frame.addHeader("accept-version", "1.1"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); + conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); + frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.1") + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); - reply = connV11.sendFrame(frame); + reply = conn.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); //reply headers: version, session, server assertEquals("1.1", reply.getHeader("version")); - connV11.disconnect(); + conn.disconnect(); // case 4 accept-version=1.0,1.1,1.2, result 1.1 - connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - frame = connV11.createFrame("CONNECT"); - frame.addHeader("accept-version", "1.0,1.1,1.3"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); + conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); + frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1,1.3") + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); - reply = connV11.sendFrame(frame); + reply = conn.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); //reply headers: version, session, server assertEquals("1.1", reply.getHeader("version")); - connV11.disconnect(); + conn.disconnect(); // case 5 accept-version=1.2, result error - connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - frame = connV11.createFrame("CONNECT"); - frame.addHeader("accept-version", "1.3"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); + conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); + frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.3") + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); - reply = connV11.sendFrame(frame); + reply = conn.sendFrame(frame); - assertEquals("ERROR", reply.getCommand()); + assertEquals(Stomp.Responses.ERROR, reply.getCommand()); - System.out.println("Got error frame " + reply); + IntegrationTestLogger.LOGGER.info("Got error frame " + reply); } @Test public void testSendAndReceive() throws Exception { - connV11.connect(defUser, defPass); - ClientStompFrame frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); - frame.setBody("Hello World 1!"); + conn.connect(defUser, defPass); - ClientStompFrame response = connV11.sendFrame(frame); + ClientStompFrame response = send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 1!"); assertNull(response); - frame.addHeader("receipt", "1234"); - frame.setBody("Hello World 2!"); - - response = connV11.sendFrame(frame); + String uuid = UUID.randomUUID().toString(); - assertNotNull(response); - - assertEquals("RECEIPT", response.getCommand()); - - assertEquals("1234", response.getHeader("receipt-id")); + response = send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 2!", true); //subscribe StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); newConn.connect(defUser, defPass); - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); + subscribe(newConn, "a-sub"); - newConn.sendFrame(subFrame); - - frame = newConn.receiveFrame(); + ClientStompFrame frame = newConn.receiveFrame(); - System.out.println("received " + frame); + IntegrationTestLogger.LOGGER.info("received " + frame); - assertEquals("MESSAGE", frame.getCommand()); + assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); - assertEquals("a-sub", frame.getHeader("subscription")); + assertEquals("a-sub", frame.getHeader(Stomp.Headers.Ack.SUBSCRIPTION)); - assertNotNull(frame.getHeader("message-id")); + assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID)); - assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination")); + assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Message.DESTINATION)); assertEquals("Hello World 1!", frame.getBody()); frame = newConn.receiveFrame(); - System.out.println("received " + frame); + IntegrationTestLogger.LOGGER.info("received " + frame); - //unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - newConn.sendFrame(unsubFrame); + unsubscribe(newConn, "a-sub"); newConn.disconnect(); } @Test public void testHeaderContentType() throws Exception { - connV11.connect(defUser, defPass); - ClientStompFrame frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "application/xml"); - frame.setBody("Hello World 1!"); - - connV11.sendFrame(frame); + conn.connect(defUser, defPass); + send(conn, getQueuePrefix() + getQueueName(), "application/xml", "Hello World 1!"); //subscribe StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); newConn.connect(defUser, defPass); - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); + subscribe(newConn, "a-sub"); - newConn.sendFrame(subFrame); + ClientStompFrame frame = newConn.receiveFrame(); - frame = newConn.receiveFrame(); - - System.out.println("received " + frame); + IntegrationTestLogger.LOGGER.info("received " + frame); - assertEquals("MESSAGE", frame.getCommand()); + assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); - assertEquals("application/xml", frame.getHeader("content-type")); + assertEquals("application/xml", frame.getHeader(Stomp.Headers.CONTENT_TYPE)); //unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); + unsubscribe(newConn, "a-sub"); newConn.disconnect(); } @Test public void testHeaderContentLength() throws Exception { - connV11.connect(defUser, defPass); - ClientStompFrame frame = connV11.createFrame("SEND"); + conn.connect(defUser, defPass); String body = "Hello World 1!"; String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "application/xml"); - frame.addHeader("content-length", cLen); - frame.setBody(body + "extra"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml") + .addHeader(Stomp.Headers.CONTENT_LENGTH, cLen) + .setBody(body + "extra"); - connV11.sendFrame(frame); + conn.sendFrame(frame); //subscribe StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); newConn.connect(defUser, defPass); - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - newConn.sendFrame(subFrame); + subscribe(newConn, "a-sub"); frame = newConn.receiveFrame(); - System.out.println("received " + frame); + IntegrationTestLogger.LOGGER.info("received " + frame); - assertEquals("MESSAGE", frame.getCommand()); + assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); - assertEquals(cLen, frame.getHeader("content-length")); + assertEquals(cLen, frame.getHeader(Stomp.Headers.CONTENT_LENGTH)); //unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); + unsubscribe(newConn, "a-sub"); newConn.disconnect(); } @Test public void testHeaderEncoding() throws Exception { - connV11.connect(defUser, defPass); - ClientStompFrame frame = connV11.createFrame("SEND"); + conn.connect(defUser, defPass); String body = "Hello World 1!"; String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length); - - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "application/xml"); - frame.addHeader("content-length", cLen); String hKey = "special-header\\\\\\n\\c"; String hVal = "\\c\\\\\\ngood"; - frame.addHeader(hKey, hVal); - System.out.println("key: |" + hKey + "| val: |" + hVal + "|"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml") + .addHeader(Stomp.Headers.CONTENT_LENGTH, cLen) + .addHeader(hKey, hVal); + + IntegrationTestLogger.LOGGER.info("key: |" + hKey + "| val: |" + hVal + "|"); frame.setBody(body); - connV11.sendFrame(frame); + conn.sendFrame(frame); //subscribe StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); newConn.connect(defUser, defPass); - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - newConn.sendFrame(subFrame); + subscribe(newConn, "a-sub"); frame = newConn.receiveFrame(); - System.out.println("received " + frame); + IntegrationTestLogger.LOGGER.info("received " + frame); - assertEquals("MESSAGE", frame.getCommand()); + assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); String value = frame.getHeader("special-header" + "\\" + "\n" + ":"); assertEquals(":" + "\\" + "\n" + "good", value); //unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); + unsubscribe(newConn, "a-sub"); newConn.disconnect(); } @@ -394,8 +353,8 @@ public class StompV11Test extends StompV11TestBase { */ @Test public void testHeaderUndefinedEscape() throws Exception { - connV11.connect(defUser, defPass); - ClientStompFrame frame = connV11.createFrame("SEND"); + conn.connect(defUser, defPass); + ClientStompFrame frame = conn.createFrame("SEND"); String body = "Hello World 1!"; String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length); @@ -411,9 +370,9 @@ public class StompV11Test extends StompV11TestBase { frame.setBody(body); - connV11.sendFrame(frame); + conn.sendFrame(frame); - ClientStompFrame error = connV11.receiveFrame(); + ClientStompFrame error = conn.receiveFrame(); System.out.println("received " + error); @@ -425,106 +384,96 @@ public class StompV11Test extends StompV11TestBase { @Test public void testHeartBeat() throws Exception { //no heart beat at all if heat-beat absent - ClientStompFrame frame = connV11.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); - ClientStompFrame reply = connV11.sendFrame(frame); + ClientStompFrame reply = conn.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); Thread.sleep(5000); - assertEquals(0, connV11.getFrameQueueSize()); + assertEquals(0, conn.getFrameQueueSize()); - connV11.disconnect(); + conn.disconnect(); //default heart beat for (0,0) which is default connection TTL (60000) / default heartBeatToTtlModifier (2.0) = 30000 - connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - frame = connV11.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "0,0"); - frame.addHeader("accept-version", "1.0,1.1"); + conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); + frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "0,0") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); - reply = connV11.sendFrame(frame); + reply = conn.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); - assertEquals("0,30000", reply.getHeader("heart-beat")); + assertEquals("0,30000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); Thread.sleep(5000); - assertEquals(0, connV11.getFrameQueueSize()); + assertEquals(0, conn.getFrameQueueSize()); - connV11.disconnect(); + conn.disconnect(); //heart-beat (1,0), should receive a min client ping accepted by server - connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - frame = connV11.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "1,0"); - frame.addHeader("accept-version", "1.0,1.1"); + conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); + frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); - reply = connV11.sendFrame(frame); + reply = conn.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); - assertEquals("0,500", reply.getHeader("heart-beat")); + assertEquals("0,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); Thread.sleep(2000); //now server side should be disconnected because we didn't send ping for 2 sec - frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); - frame.setBody("Hello World"); - //send will fail try { - connV11.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World"); fail("connection should have been destroyed by now"); } catch (IOException e) { //ignore } //heart-beat (1,0), start a ping, then send a message, should be ok. - connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - frame = connV11.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "1,0"); - frame.addHeader("accept-version", "1.0,1.1"); + conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); + frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); - reply = connV11.sendFrame(frame); + reply = conn.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); - assertEquals("0,500", reply.getHeader("heart-beat")); + assertEquals("0,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); - System.out.println("========== start pinger!"); + IntegrationTestLogger.LOGGER.info("========== start pinger!"); - connV11.startPinger(500); + conn.startPinger(500); Thread.sleep(2000); //now server side should be disconnected because we didn't send ping for 2 sec - frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); - frame.setBody("Hello World"); - //send will be ok - connV11.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World"); - connV11.stopPinger(); + conn.stopPinger(); - connV11.disconnect(); + conn.disconnect(); } @@ -532,82 +481,72 @@ public class StompV11Test extends StompV11TestBase { @Test public void testHeartBeat2() throws Exception { //heart-beat (1,1) - ClientStompFrame frame = connV11.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "1,1"); - frame.addHeader("accept-version", "1.0,1.1"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,1") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); - ClientStompFrame reply = connV11.sendFrame(frame); + ClientStompFrame reply = conn.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); - assertEquals("500,500", reply.getHeader("heart-beat")); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); + assertEquals("500,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); - connV11.disconnect(); + conn.disconnect(); //heart-beat (500,1000) - connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - frame = connV11.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "500,1000"); - frame.addHeader("accept-version", "1.0,1.1"); + conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); + frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); - reply = connV11.sendFrame(frame); + reply = conn.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); - assertEquals("1000,500", reply.getHeader("heart-beat")); + assertEquals("1000,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); - System.out.println("========== start pinger!"); + IntegrationTestLogger.LOGGER.info("========== start pinger!"); - connV11.startPinger(500); + conn.startPinger(500); Thread.sleep(10000); //now check the frame size - int size = connV11.getServerPingNumber(); + int size = conn.getServerPingNumber(); - System.out.println("ping received: " + size); + IntegrationTestLogger.LOGGER.info("ping received: " + size); assertTrue(size > 5); //now server side should be disconnected because we didn't send ping for 2 sec - frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); - frame.setBody("Hello World"); - //send will be ok - connV11.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World"); - connV11.disconnect(); + conn.disconnect(); } @Test public void testSendWithHeartBeatsAndReceive() throws Exception { StompClientConnection newConn = null; try { - ClientStompFrame frame = connV11.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "500,1000"); - frame.addHeader("accept-version", "1.0,1.1"); - - connV11.sendFrame(frame); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); - connV11.startPinger(500); + conn.sendFrame(frame); - frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); + conn.startPinger(500); for (int i = 0; i < 10; i++) { - frame.setBody("Hello World " + i + "!"); - connV11.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!"); Thread.sleep(500); } @@ -615,12 +554,7 @@ public class StompV11Test extends StompV11TestBase { newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); newConn.connect(defUser, defPass); - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - newConn.sendFrame(subFrame); + subscribe(newConn, "a-sub"); int cnt = 0; @@ -635,38 +569,32 @@ public class StompV11Test extends StompV11TestBase { assertEquals(10, cnt); // unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - newConn.sendFrame(unsubFrame); + unsubscribe(newConn, "a-sub"); } finally { if (newConn != null) newConn.disconnect(); - connV11.disconnect(); + conn.disconnect(); } } @Test public void testSendAndReceiveWithHeartBeats() throws Exception { - connV11.connect(defUser, defPass); - ClientStompFrame frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); + conn.connect(defUser, defPass); for (int i = 0; i < 10; i++) { - frame.setBody("Hello World " + i + "!"); - connV11.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!"); Thread.sleep(500); } //subscribe StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); try { - frame = newConn.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "500,1000"); - frame.addHeader("accept-version", "1.0,1.1"); + ClientStompFrame frame = newConn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); newConn.sendFrame(frame); @@ -674,12 +602,7 @@ public class StompV11Test extends StompV11TestBase { Thread.sleep(500); - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - newConn.sendFrame(subFrame); + subscribe(newConn, "a-sub"); int cnt = 0; @@ -694,9 +617,7 @@ public class StompV11Test extends StompV11TestBase { assertEquals(10, cnt); // unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - newConn.sendFrame(unsubFrame); + unsubscribe(newConn, "a-sub"); } finally { newConn.disconnect(); } @@ -706,35 +627,30 @@ public class StompV11Test extends StompV11TestBase { public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception { StompClientConnection newConn = null; try { - ClientStompFrame frame = connV11.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "500,1000"); - frame.addHeader("accept-version", "1.0,1.1"); - - connV11.sendFrame(frame); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); - connV11.startPinger(500); + conn.sendFrame(frame); - frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); + conn.startPinger(500); for (int i = 0; i < 10; i++) { - frame.setBody("Hello World " + i + "!"); - connV11.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!"); Thread.sleep(500); } // subscribe newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - frame = newConn.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "500,1000"); - frame.addHeader("accept-version", "1.0,1.1"); + frame = newConn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); newConn.sendFrame(frame); @@ -742,12 +658,7 @@ public class StompV11Test extends StompV11TestBase { Thread.sleep(500); - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - newConn.sendFrame(subFrame); + subscribe(newConn, "a-sub"); int cnt = 0; @@ -761,13 +672,11 @@ public class StompV11Test extends StompV11TestBase { assertEquals(10, cnt); // unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - newConn.sendFrame(unsubFrame); + unsubscribe(newConn, "a-sub"); } finally { if (newConn != null) newConn.disconnect(); - connV11.disconnect(); + conn.disconnect(); } } @@ -781,14 +690,14 @@ public class StompV11Test extends StompV11TestBase { StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); //no heart beat at all if heat-beat absent - frame = connection.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); + frame = connection.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); reply = connection.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); Thread.sleep(3000); @@ -805,20 +714,20 @@ public class StompV11Test extends StompV11TestBase { //no heart beat for (0,0) connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); - frame = connection.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "0,0"); - frame.addHeader("accept-version", "1.0,1.1"); + frame = connection.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "0,0") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); reply = connection.sendFrame(frame); IntegrationTestLogger.LOGGER.info("Reply: " + reply); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); - assertEquals("0,500", reply.getHeader("heart-beat")); + assertEquals("0,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); Thread.sleep(3000); @@ -835,30 +744,25 @@ public class StompV11Test extends StompV11TestBase { //heart-beat (1,0), should receive a min client ping accepted by server connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); - frame = connection.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "1,0"); - frame.addHeader("accept-version", "1.0,1.1"); + frame = connection.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); reply = connection.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); - assertEquals("0,2500", reply.getHeader("heart-beat")); + assertEquals("0,2500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); Thread.sleep(7000); //now server side should be disconnected because we didn't send ping for 2 sec - frame = connection.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); - frame.setBody("Hello World"); - //send will fail try { - connection.sendFrame(frame); + send(connection, getQueuePrefix() + getQueueName(), "text/plain", "Hello World"); fail("connection should have been destroyed by now"); } catch (IOException e) { //ignore @@ -866,33 +770,28 @@ public class StompV11Test extends StompV11TestBase { //heart-beat (1,0), start a ping, then send a message, should be ok. connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); - frame = connection.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "1,0"); - frame.addHeader("accept-version", "1.0,1.1"); + frame = connection.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); reply = connection.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); - assertEquals("0,2500", reply.getHeader("heart-beat")); + assertEquals("0,2500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); - System.out.println("========== start pinger!"); + IntegrationTestLogger.LOGGER.info("========== start pinger!"); connection.startPinger(2500); Thread.sleep(7000); //now server side should be disconnected because we didn't send ping for 2 sec - frame = connection.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); - frame.setBody("Hello World"); - //send will be ok - connection.sendFrame(frame); + send(connection, getQueuePrefix() + getQueueName(), "text/plain", "Hello World"); connection.stopPinger(); @@ -900,30 +799,25 @@ public class StompV11Test extends StompV11TestBase { //heart-beat (20000,0), should receive a max client ping accepted by server connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); - frame = connection.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "20000,0"); - frame.addHeader("accept-version", "1.0,1.1"); + frame = connection.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "20000,0") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); reply = connection.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); - assertEquals("0,5000", reply.getHeader("heart-beat")); + assertEquals("0,5000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); Thread.sleep(12000); //now server side should be disconnected because we didn't send ping for 2 sec - frame = connection.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); - frame.setBody("Hello World"); - //send will fail try { - connection.sendFrame(frame); + send(connection, getQueuePrefix() + getQueueName(), "text/plain", "Hello World"); fail("connection should have been destroyed by now"); } catch (IOException e) { //ignore @@ -940,18 +834,18 @@ public class StompV11Test extends StompV11TestBase { server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1").start(); connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); - frame = connection.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "5000,0"); - frame.addHeader("accept-version", "1.0,1.1"); + frame = connection.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "5000,0") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); reply = connection.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); - assertEquals("0,5000", reply.getHeader("heart-beat")); + assertEquals("0,5000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); Thread.sleep(6000); @@ -966,18 +860,18 @@ public class StompV11Test extends StompV11TestBase { server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1.5").start(); connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); - frame = connection.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "5000,0"); - frame.addHeader("accept-version", "1.0,1.1"); + frame = connection.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "5000,0") + .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); reply = connection.sendFrame(frame); - assertEquals("CONNECTED", reply.getCommand()); + assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); - assertEquals("0,5000", reply.getHeader("heart-beat")); + assertEquals("0,5000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); Thread.sleep(6000); @@ -986,21 +880,21 @@ public class StompV11Test extends StompV11TestBase { @Test public void testNack() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV11, "sub1", "client"); + subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV11.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - String messageID = frame.getHeader("message-id"); + String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID); - nack(connV11, "sub1", messageID); + nack(conn, "sub1", messageID); - unsubscribe(connV11, "sub1"); + unsubscribe(conn, "sub1"); - connV11.disconnect(); + conn.disconnect(); //Nack makes the message be dropped. MessageConsumer consumer = session.createConsumer(queue); @@ -1010,25 +904,25 @@ public class StompV11Test extends StompV11TestBase { @Test public void testNackWithWrongSubId() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV11, "sub1", "client"); + subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV11.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - String messageID = frame.getHeader("message-id"); + String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID); - nack(connV11, "sub2", messageID); + nack(conn, "sub2", messageID); - ClientStompFrame error = connV11.receiveFrame(); + ClientStompFrame error = conn.receiveFrame(); - System.out.println("Receiver error: " + error); + IntegrationTestLogger.LOGGER.info("Receiver error: " + error); - unsubscribe(connV11, "sub1"); + unsubscribe(conn, "sub1"); - connV11.disconnect(); + conn.disconnect(); //message should be still there MessageConsumer consumer = session.createConsumer(queue); @@ -1038,25 +932,25 @@ public class StompV11Test extends StompV11TestBase { @Test public void testNackWithWrongMessageId() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV11, "sub1", "client"); + subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV11.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - frame.getHeader("message-id"); + frame.getHeader(Stomp.Headers.Message.MESSAGE_ID); - nack(connV11, "sub2", "someother"); + nack(conn, "sub2", "someother"); - ClientStompFrame error = connV11.receiveFrame(); + ClientStompFrame error = conn.receiveFrame(); - System.out.println("Receiver error: " + error); + IntegrationTestLogger.LOGGER.info("Receiver error: " + error); - unsubscribe(connV11, "sub1"); + unsubscribe(conn, "sub1"); - connV11.disconnect(); + conn.disconnect(); //message should still there MessageConsumer consumer = session.createConsumer(queue); @@ -1066,21 +960,21 @@ public class StompV11Test extends StompV11TestBase { @Test public void testAck() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV11, "sub1", "client"); + subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV11.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - String messageID = frame.getHeader("message-id"); + String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID); - ack(connV11, "sub1", messageID, null); + ack(conn, "sub1", messageID, null); - unsubscribe(connV11, "sub1"); + unsubscribe(conn, "sub1"); - connV11.disconnect(); + conn.disconnect(); //Nack makes the message be dropped. MessageConsumer consumer = session.createConsumer(queue); @@ -1090,25 +984,25 @@ public class StompV11Test extends StompV11TestBase { @Test public void testAckWithWrongSubId() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV11, "sub1", "client"); + subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV11.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - String messageID = frame.getHeader("message-id"); + String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID); - ack(connV11, "sub2", messageID, null); + ack(conn, "sub2", messageID, null); - ClientStompFrame error = connV11.receiveFrame(); + ClientStompFrame error = conn.receiveFrame(); - System.out.println("Receiver error: " + error); + IntegrationTestLogger.LOGGER.info("Receiver error: " + error); - unsubscribe(connV11, "sub1"); + unsubscribe(conn, "sub1"); - connV11.disconnect(); + conn.disconnect(); //message should be still there MessageConsumer consumer = session.createConsumer(queue); @@ -1118,25 +1012,25 @@ public class StompV11Test extends StompV11TestBase { @Test public void testAckWithWrongMessageId() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV11, "sub1", "client"); + subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV11.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - frame.getHeader("message-id"); + frame.getHeader(Stomp.Headers.Message.MESSAGE_ID); - ack(connV11, "sub2", "someother", null); + ack(conn, "sub2", "someother", null); - ClientStompFrame error = connV11.receiveFrame(); + ClientStompFrame error = conn.receiveFrame(); - System.out.println("Receiver error: " + error); + IntegrationTestLogger.LOGGER.info("Receiver error: " + error); - unsubscribe(connV11, "sub1"); + unsubscribe(conn, "sub1"); - connV11.disconnect(); + conn.disconnect(); //message should still there MessageConsumer consumer = session.createConsumer(queue); @@ -1146,33 +1040,33 @@ public class StompV11Test extends StompV11TestBase { @Test public void testErrorWithReceipt() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV11, "sub1", "client"); + subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV11.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - String messageID = frame.getHeader("message-id"); + String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID); - ClientStompFrame ackFrame = connV11.createFrame("ACK"); //give it a wrong sub id - ackFrame.addHeader("subscription", "sub2"); - ackFrame.addHeader("message-id", messageID); - ackFrame.addHeader("receipt", "answer-me"); + ClientStompFrame ackFrame = conn.createFrame(Stomp.Commands.ACK) + .addHeader(Stomp.Headers.Ack.SUBSCRIPTION, "sub2") + .addHeader(Stomp.Headers.Message.MESSAGE_ID, messageID) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, "answer-me"); - ClientStompFrame error = connV11.sendFrame(ackFrame); + ClientStompFrame error = conn.sendFrame(ackFrame); - System.out.println("Receiver error: " + error); + IntegrationTestLogger.LOGGER.info("Receiver error: " + error); - assertEquals("ERROR", error.getCommand()); + assertEquals(Stomp.Responses.ERROR, error.getCommand()); - assertEquals("answer-me", error.getHeader("receipt-id")); + assertEquals("answer-me", error.getHeader(Stomp.Headers.Response.RECEIPT_ID)); - unsubscribe(connV11, "sub1"); + unsubscribe(conn, "sub1"); - connV11.disconnect(); + conn.disconnect(); //message should still there MessageConsumer consumer = session.createConsumer(queue); @@ -1182,33 +1076,33 @@ public class StompV11Test extends StompV11TestBase { @Test public void testErrorWithReceipt2() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV11, "sub1", "client"); + subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV11.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - String messageID = frame.getHeader("message-id"); + String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID); - ClientStompFrame ackFrame = connV11.createFrame("ACK"); //give it a wrong sub id - ackFrame.addHeader("subscription", "sub1"); - ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) + 1)); - ackFrame.addHeader("receipt", "answer-me"); + ClientStompFrame ackFrame = conn.createFrame(Stomp.Commands.ACK) + .addHeader(Stomp.Headers.Ack.SUBSCRIPTION, "sub1") + .addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(Long.valueOf(messageID) + 1)) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, "answer-me"); - ClientStompFrame error = connV11.sendFrame(ackFrame); + ClientStompFrame error = conn.sendFrame(ackFrame); - System.out.println("Receiver error: " + error); + IntegrationTestLogger.LOGGER.info("Receiver error: " + error); - assertEquals("ERROR", error.getCommand()); + assertEquals(Stomp.Responses.ERROR, error.getCommand()); - assertEquals("answer-me", error.getHeader("receipt-id")); + assertEquals("answer-me", error.getHeader(Stomp.Headers.Response.RECEIPT_ID)); - unsubscribe(connV11, "sub1"); + unsubscribe(conn, "sub1"); - connV11.disconnect(); + conn.disconnect(); //message should still there MessageConsumer consumer = session.createConsumer(queue); @@ -1218,29 +1112,29 @@ public class StompV11Test extends StompV11TestBase { @Test public void testAckModeClient() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV11, "sub1", "client"); + subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT); int num = 50; //send a bunch of messages for (int i = 0; i < num; i++) { - this.sendMessage("client-ack" + i); + this.sendJmsMessage("client-ack" + i); } ClientStompFrame frame = null; for (int i = 0; i < num; i++) { - frame = connV11.receiveFrame(); + frame = conn.receiveFrame(); assertNotNull(frame); } //ack the last - this.ack(connV11, "sub1", frame); + this.ack(conn, "sub1", frame); - unsubscribe(connV11, "sub1"); + unsubscribe(conn, "sub1"); - connV11.disconnect(); + conn.disconnect(); //no messages can be received. MessageConsumer consumer = session.createConsumer(queue); @@ -1250,31 +1144,31 @@ public class StompV11Test extends StompV11TestBase { @Test public void testAckModeClient2() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV11, "sub1", "client"); + subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT); int num = 50; //send a bunch of messages for (int i = 0; i < num; i++) { - this.sendMessage("client-ack" + i); + this.sendJmsMessage("client-ack" + i); } ClientStompFrame frame = null; for (int i = 0; i < num; i++) { - frame = connV11.receiveFrame(); + frame = conn.receiveFrame(); assertNotNull(frame); //ack the 49th if (i == num - 2) { - this.ack(connV11, "sub1", frame); + this.ack(conn, "sub1", frame); } } - unsubscribe(connV11, "sub1"); + unsubscribe(conn, "sub1"); - connV11.disconnect(); + conn.disconnect(); //no messages can be received. MessageConsumer consumer = session.createConsumer(queue); @@ -1286,26 +1180,26 @@ public class StompV11Test extends StompV11TestBase { @Test public void testAckModeAuto() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV11, "sub1", "auto"); + subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO); int num = 50; //send a bunch of messages for (int i = 0; i < num; i++) { - this.sendMessage("auto-ack" + i); + this.sendJmsMessage("auto-ack" + i); } ClientStompFrame frame = null; for (int i = 0; i < num; i++) { - frame = connV11.receiveFrame(); + frame = conn.receiveFrame(); assertNotNull(frame); } - unsubscribe(connV11, "sub1"); + unsubscribe(conn, "sub1"); - connV11.disconnect(); + conn.disconnect(); //no messages can be received. MessageConsumer consumer = session.createConsumer(queue); @@ -1315,32 +1209,32 @@ public class StompV11Test extends StompV11TestBase { @Test public void testAckModeClientIndividual() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV11, "sub1", "client-individual"); + subscribe(conn, "sub1", "client-individual"); int num = 50; //send a bunch of messages for (int i = 0; i < num; i++) { - this.sendMessage("client-individual-ack" + i); + this.sendJmsMessage("client-individual-ack" + i); } ClientStompFrame frame = null; for (int i = 0; i < num; i++) { - frame = connV11.receiveFrame(); + frame = conn.receiveFrame(); assertNotNull(frame); - System.out.println(i + " == received: " + frame); + IntegrationTestLogger.LOGGER.info(i + " == received: " + frame); //ack on even numbers if (i % 2 == 0) { - this.ack(connV11, "sub1", frame); + this.ack(conn, "sub1", frame); } } - unsubscribe(connV11, "sub1"); + unsubscribe(conn, "sub1"); - connV11.disconnect(); + conn.disconnect(); //no messages can be received. MessageConsumer consumer = session.createConsumer(queue); @@ -1349,7 +1243,7 @@ public class StompV11Test extends StompV11TestBase { for (int i = 0; i < num / 2; i++) { message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); - System.out.println("Legal: " + message.getText()); + IntegrationTestLogger.LOGGER.info("Legal: " + message.getText()); } message = (TextMessage) consumer.receive(1000); @@ -1359,64 +1253,55 @@ public class StompV11Test extends StompV11TestBase { @Test public void testTwoSubscribers() throws Exception { - connV11.connect(defUser, defPass, CLIENT_ID); + conn.connect(defUser, defPass, CLIENT_ID); - this.subscribeTopic(connV11, "sub1", "auto", null); + this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); newConn.connect(defUser, defPass, "myclientid2"); - this.subscribeTopic(newConn, "sub2", "auto", null); - - ClientStompFrame frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getTopicPrefix() + getTopicName()); - - frame.setBody("Hello World"); + this.subscribeTopic(newConn, "sub2", Stomp.Headers.Subscribe.AckModeValues.AUTO, null); - connV11.sendFrame(frame); + send(conn, getTopicPrefix() + getTopicName(), null, "Hello World"); // receive message from socket - frame = connV11.receiveFrame(1000); + ClientStompFrame frame = conn.receiveFrame(1000); - System.out.println("received frame : " + frame); + IntegrationTestLogger.LOGGER.info("received frame : " + frame); assertEquals("Hello World", frame.getBody()); - assertEquals("sub1", frame.getHeader("subscription")); + assertEquals("sub1", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION)); frame = newConn.receiveFrame(1000); - System.out.println("received 2 frame : " + frame); + IntegrationTestLogger.LOGGER.info("received 2 frame : " + frame); assertEquals("Hello World", frame.getBody()); - assertEquals("sub2", frame.getHeader("subscription")); + assertEquals("sub2", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION)); // remove suscription - this.unsubscribe(connV11, "sub1", true); + this.unsubscribe(conn, "sub1", true); this.unsubscribe(newConn, "sub2", true); - connV11.disconnect(); + conn.disconnect(); newConn.disconnect(); } @Test public void testSendAndReceiveOnDifferentConnections() throws Exception { - connV11.connect(defUser, defPass); - - ClientStompFrame sendFrame = connV11.createFrame("SEND"); - sendFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - sendFrame.setBody("Hello World"); + conn.connect(defUser, defPass); - connV11.sendFrame(sendFrame); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World"); StompClientConnection connV11_2 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); connV11_2.connect(defUser, defPass); - this.subscribe(connV11_2, "sub1", "auto"); + this.subscribe(connV11_2, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO); ClientStompFrame frame = connV11_2.receiveFrame(2000); - assertEquals("MESSAGE", frame.getCommand()); + assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); assertEquals("Hello World", frame.getBody()); - connV11.disconnect(); + conn.disconnect(); connV11_2.disconnect(); } @@ -1424,79 +1309,81 @@ public class StompV11Test extends StompV11TestBase { @Test public void testBeginSameTransactionTwice() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - beginTransaction(connV11, "tx1"); + beginTransaction(conn, "tx1"); - beginTransaction(connV11, "tx1"); + beginTransaction(conn, "tx1"); - ClientStompFrame f = connV11.receiveFrame(); - Assert.assertTrue(f.getCommand().equals("ERROR")); + ClientStompFrame f = conn.receiveFrame(); + Assert.assertTrue(f.getCommand().equals(Stomp.Responses.ERROR)); } @Test public void testBodyWithUTF8() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - this.subscribe(connV11, getName(), "auto"); + this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.AUTO); String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C"; - System.out.println(text); - sendMessage(text); + IntegrationTestLogger.LOGGER.info(text); + sendJmsMessage(text); - ClientStompFrame frame = connV11.receiveFrame(); - System.out.println(frame); - Assert.assertTrue(frame.getCommand().equals("MESSAGE")); - Assert.assertNotNull(frame.getHeader("destination")); + ClientStompFrame frame = conn.receiveFrame(); + IntegrationTestLogger.LOGGER.info(frame); + Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE)); + Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.DESTINATION)); Assert.assertTrue(frame.getBody().equals(text)); - connV11.disconnect(); + conn.disconnect(); } @Test public void testClientAckNotPartOfTransaction() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - this.subscribe(connV11, getName(), "client"); + this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.CLIENT); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV11.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - Assert.assertTrue(frame.getCommand().equals("MESSAGE")); - Assert.assertNotNull(frame.getHeader("destination")); + Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE)); + Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.DESTINATION)); Assert.assertTrue(frame.getBody().equals(getName())); - Assert.assertNotNull(frame.getHeader("message-id")); + Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID)); - String messageID = frame.getHeader("message-id"); + String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID); - beginTransaction(connV11, "tx1"); + beginTransaction(conn, "tx1"); - this.ack(connV11, getName(), messageID, "tx1"); + this.ack(conn, getName(), messageID, "tx1"); - abortTransaction(connV11, "tx1"); + abortTransaction(conn, "tx1"); - frame = connV11.receiveFrame(500); + frame = conn.receiveFrame(500); assertNull(frame); - this.unsubscribe(connV11, getName()); + this.unsubscribe(conn, getName()); - connV11.disconnect(); + conn.disconnect(); } @Test public void testDisconnectAndError() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); + + this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.CLIENT); - this.subscribe(connV11, getName(), "client"); + String uuid = UUID.randomUUID().toString(); - ClientStompFrame frame = connV11.createFrame("DISCONNECT"); - frame.addHeader("receipt", "1"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.DISCONNECT) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); - ClientStompFrame result = connV11.sendFrame(frame); + ClientStompFrame result = conn.sendFrame(frame); - if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) { + if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) { fail("Disconnect failed! " + result); } @@ -1505,12 +1392,9 @@ public class StompV11Test extends StompV11TestBase { Thread thr = new Thread() { @Override public void run() { - ClientStompFrame sendFrame = connV11.createFrame("SEND"); - sendFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - sendFrame.setBody("Hello World"); while (latch.getCount() != 0) { try { - connV11.sendFrame(sendFrame); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World"); Thread.sleep(500); } catch (InterruptedException e) { //retry @@ -1523,7 +1407,7 @@ public class StompV11Test extends StompV11TestBase { latch.countDown(); break; } finally { - connV11.destroy(); + conn.destroy(); } } } @@ -1543,66 +1427,68 @@ public class StompV11Test extends StompV11TestBase { @Test public void testDurableSubscriber() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - this.subscribe(connV11, "sub1", "client", getName()); + this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName()); - this.subscribe(connV11, "sub1", "client", getName()); + this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName()); - ClientStompFrame frame = connV11.receiveFrame(); - Assert.assertTrue(frame.getCommand().equals("ERROR")); + ClientStompFrame frame = conn.receiveFrame(); + Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR)); - connV11.disconnect(); + conn.disconnect(); } @Test public void testDurableSubscriberWithReconnection() throws Exception { - connV11.connect(defUser, defPass, CLIENT_ID); + conn.connect(defUser, defPass, CLIENT_ID); - this.subscribeTopic(connV11, "sub1", "auto", getName()); + this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName()); - ClientStompFrame frame = connV11.createFrame("DISCONNECT"); - frame.addHeader("receipt", "1"); + String uuid = UUID.randomUUID().toString(); - ClientStompFrame result = connV11.sendFrame(frame); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.DISCONNECT) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); - if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) { + ClientStompFrame result = conn.sendFrame(frame); + + if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) { fail("Disconnect failed! " + result); } // send the message when the durable subscriber is disconnected - sendMessage(getName(), topic); + sendJmsMessage(getName(), topic); - connV11.destroy(); - connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - connV11.connect(defUser, defPass, CLIENT_ID); + conn.destroy(); + conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); + conn.connect(defUser, defPass, CLIENT_ID); - this.subscribeTopic(connV11, "sub1", "auto", getName()); + this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName()); // we must have received the message - frame = connV11.receiveFrame(); + frame = conn.receiveFrame(); - Assert.assertTrue(frame.getCommand().equals("MESSAGE")); - Assert.assertNotNull(frame.getHeader("destination")); + Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE)); + Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.DESTINATION)); Assert.assertEquals(getName(), frame.getBody()); - this.unsubscribe(connV11, "sub1"); + this.unsubscribe(conn, "sub1"); - connV11.disconnect(); + conn.disconnect(); } @Test public void testDurableUnSubscribe() throws Exception { - connV11.connect(defUser, defPass, CLIENT_ID); + conn.connect(defUser, defPass, CLIENT_ID); - this.subscribeTopic(connV11, null, "auto", getName()); + this.subscribeTopic(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, getName()); - connV11.disconnect(); - connV11.destroy(); - connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - connV11.connect(defUser, defPass, CLIENT_ID); + conn.disconnect(); + conn.destroy(); + conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); + conn.connect(defUser, defPass, CLIENT_ID); - this.unsubscribe(connV11, getName(), false, true); + this.unsubscribe(conn, getName(), null, false, true); long start = System.currentTimeMillis(); SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName()); @@ -1612,21 +1498,21 @@ public class StompV11Test extends StompV11TestBase { assertNull(server.getActiveMQServer().locateQueue(queueName)); - connV11.disconnect(); + conn.disconnect(); } @Test public void testJMSXGroupIdCanBeSet() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - ClientStompFrame frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("JMSXGroupID", "TEST"); - frame.setBody("Hello World"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader("JMSXGroupID", "TEST") + .setBody("Hello World"); - connV11.sendFrame(frame); + conn.sendFrame(frame); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -1640,64 +1526,64 @@ public class StompV11Test extends StompV11TestBase { int ctr = 10; String[] data = new String[ctr]; - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - this.subscribe(connV11, "sub1", "auto"); + this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO); for (int i = 0; i < ctr; ++i) { data[i] = getName() + i; - sendMessage(data[i]); + sendJmsMessage(data[i]); } ClientStompFrame frame = null; for (int i = 0; i < ctr; ++i) { - frame = connV11.receiveFrame(); + frame = conn.receiveFrame(); Assert.assertTrue("Message not in order", frame.getBody().equals(data[i])); } for (int i = 0; i < ctr; ++i) { data[i] = getName() + ":second:" + i; - sendMessage(data[i]); + sendJmsMessage(data[i]); } for (int i = 0; i < ctr; ++i) { - frame = connV11.receiveFrame(); + frame = conn.receiveFrame(); Assert.assertTrue("Message not in order", frame.getBody().equals(data[i])); } - connV11.disconnect(); + conn.disconnect(); } @Test public void testSubscribeWithAutoAckAndSelector() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - this.subscribe(connV11, "sub1", "auto", null, "foo = 'zzz'"); + this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "foo = 'zzz'"); - sendMessage("Ignored message", "foo", "1234"); - sendMessage("Real message", "foo", "zzz"); + sendJmsMessage("Ignored message", "foo", "1234"); + sendJmsMessage("Real message", "foo", "zzz"); - ClientStompFrame frame = connV11.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message")); - connV11.disconnect(); + conn.disconnect(); } @Test public void testRedeliveryWithClientAck() throws Exception { - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - this.subscribe(connV11, "subId", "client"); + this.subscribe(conn, "subscriptionId", Stomp.Headers.Subscribe.AckModeValues.CLIENT); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV11.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - assertTrue(frame.getCommand().equals("MESSAGE")); + assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE)); - connV11.disconnect(); + conn.disconnect(); // message should be received since message was not acknowledged MessageConsumer consumer = session.createConsumer(queue); @@ -1710,7 +1596,7 @@ public class StompV11Test extends StompV11TestBase { public void testSendManyMessages() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); int count = 1000; final CountDownLatch latch = new CountDownLatch(count); @@ -1721,30 +1607,22 @@ public class StompV11Test extends StompV11TestBase { } }); - ClientStompFrame frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.setBody("Hello World"); - for (int i = 1; i <= count; i++) { - connV11.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World"); } assertTrue(latch.await(60, TimeUnit.SECONDS)); - connV11.disconnect(); + conn.disconnect(); } @Test public void testSendMessage() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - ClientStompFrame frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.setBody("Hello World"); - - connV11.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World"); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -1763,18 +1641,16 @@ public class StompV11Test extends StompV11TestBase { public void testSendMessageWithContentLength() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); byte[] data = new byte[]{1, 0, 0, 4}; - ClientStompFrame frame = connV11.createFrame("SEND"); - - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.setBody(new String(data, StandardCharsets.UTF_8)); - - frame.addHeader("content-length", String.valueOf(data.length)); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .setBody(new String(data, StandardCharsets.UTF_8)) + .addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length)); - connV11.sendFrame(frame); + conn.sendFrame(frame); BytesMessage message = (BytesMessage) consumer.receive(10000); Assert.assertNotNull(message); @@ -1790,16 +1666,15 @@ public class StompV11Test extends StompV11TestBase { public void testSendMessageWithCustomHeadersAndSelector() throws Exception { MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'"); - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - ClientStompFrame frame = connV11.createFrame("SEND"); - frame.addHeader("foo", "abc"); - frame.addHeader("bar", "123"); - - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.setBody("Hello World"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader("foo", "abc") + .addHeader("bar", "123") + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .setBody("Hello World"); - connV11.sendFrame(frame); + conn.sendFrame(frame); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -1812,14 +1687,13 @@ public class StompV11Test extends StompV11TestBase { public void testSendMessageWithLeadingNewLine() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - ClientStompFrame frame = connV11.createFrame("SEND"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .setBody("Hello World"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.setBody("Hello World"); - - connV11.sendWickedFrame(frame); + conn.sendWickedFrame(frame); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -1833,24 +1707,17 @@ public class StompV11Test extends StompV11TestBase { assertNull(consumer.receive(1000)); - connV11.disconnect(); + conn.disconnect(); } @Test public void testSendMessageWithReceipt() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - ClientStompFrame frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("receipt", "1234"); - frame.setBody("Hello World"); - - frame = connV11.sendFrame(frame); - assertTrue(frame.getCommand().equals("RECEIPT")); - assertEquals("1234", frame.getHeader("receipt-id")); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -1862,28 +1729,27 @@ public class StompV11Test extends StompV11TestBase { long tmsg = message.getJMSTimestamp(); Assert.assertTrue(Math.abs(tnow - tmsg) < 1000); - connV11.disconnect(); + conn.disconnect(); } @Test public void testSendMessageWithStandardHeaders() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV11.connect(defUser, defPass); + conn.connect(defUser, defPass); - ClientStompFrame frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("correlation-id", "c123"); - frame.addHeader("persistent", "true"); - frame.addHeader("priority", "3"); - frame.addHeader("type", "t345"); - frame.addHeader("JMSXGroupID", "abc"); - frame.addHeader("foo", "abc"); - frame.addHeader("bar", "123"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader(Stomp.Headers.Message.CORRELATION_ID, "c123") + .addHeader(Stomp.Headers.Message.PERSISTENT, "true") + .addHeader(Stomp.Headers.Message.PRIORITY, "3") + .addHeader(Stomp.Headers.Message.TYPE, "t345") + .addHeader("JMSXGroupID", "abc") + .addHeader("foo", "abc") + .addHeader("bar", "123") + .setBody("Hello World"); - frame.setBody("Hello World"); - - frame = connV11.sendFrame(frame); + frame = conn.sendFrame(frame); TextMessage message = (TextMessage) consumer
<TRUNCATED>
