Author: gtully
Date: Fri Feb 10 20:18:50 2012
New Revision: 1242911
URL: http://svn.apache.org/viewvc?rev=1242911&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3653 - tidy up stomptest and resolve
content length for stomp+nio, resolve break of stompnio and stompniossl tests
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1242911&r1=1242910&r2=1242911&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Fri Feb 10 20:18:50 2012
@@ -583,7 +583,7 @@ public class ProtocolConverter {
HashSet<String> acceptsVersions = new
HashSet<String>(Arrays.asList(accepts.split(Stomp.COMMA)));
acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
if (acceptsVersions.isEmpty()) {
- throw new ProtocolException("Invlid Protocol version, supported
versions are: " +
+ throw new ProtocolException("Invalid Protocol version[" + accepts
+"], supported versions are: " +
Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
} else {
this.version = Collections.max(acceptsVersions);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java?rev=1242911&r1=1242910&r2=1242911&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
Fri Feb 10 20:18:50 2012
@@ -58,7 +58,7 @@ public class StompCodec {
action =
((StompWireFormat)transport.getWireFormat()).parseAction(data);
headers =
((StompWireFormat)transport.getWireFormat()).parseHeaders(data);
String contentLengthHeader =
headers.get(Stomp.Headers.CONTENT_LENGTH);
- if (contentLengthHeader != null) {
+ if ((action.equals(Stomp.Commands.SEND) ||
action.equals(Stomp.Responses.MESSAGE)) && contentLengthHeader != null) {
contentLength =
((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader);
} else {
contentLength = -1;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=1242911&r1=1242910&r2=1242911&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
Fri Feb 10 20:18:50 2012
@@ -52,7 +52,6 @@ public class StompConnection {
byte[] bytes = data.getBytes("UTF-8");
OutputStream outputStream = stompSocket.getOutputStream();
outputStream.write(bytes);
- outputStream.write(0);
outputStream.flush();
}
@@ -61,7 +60,6 @@ public class StompConnection {
OutputStream outputStream = stompSocket.getOutputStream();
outputStream.write(bytes);
outputStream.write(data);
- outputStream.write(0);
outputStream.flush();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java?rev=1242911&r1=1242910&r2=1242911&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
Fri Feb 10 20:18:50 2012
@@ -205,6 +205,8 @@ public class StompFrame implements Comma
buffer.append(Arrays.toString(getContent()));
}
}
+ // terminate the frame
+ buffer.append('\u0000');
return buffer.toString();
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1242911&r1=1242910&r2=1242911&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Fri Feb 10 20:18:50 2012
@@ -24,6 +24,7 @@ import org.apache.activemq.broker.jmx.Br
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +37,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -47,10 +49,10 @@ public class StompTest extends Combinati
protected String jmsUri = "vm://localhost";
private BrokerService broker;
- private StompConnection stompConnection = new StompConnection();
- private Connection connection;
- private Session session;
- private ActiveMQQueue queue;
+ protected StompConnection stompConnection = new StompConnection();
+ protected Connection connection;
+ protected Session session;
+ protected ActiveMQQueue queue;
private final String xmlObject = "<pojo>\n"
+ " <name>Dejan</name>\n"
+ " <city>Belgrade</city>\n"
@@ -115,7 +117,7 @@ public class StompTest extends Combinati
connection.start();
}
- private void stompConnect() throws IOException, URISyntaxException,
UnknownHostException {
+ protected void stompConnect() throws IOException, URISyntaxException,
UnknownHostException {
URI connectUri = new URI(bindAddress);
stompConnection.open(createSocket(connectUri));
}
@@ -146,7 +148,7 @@ public class StompTest extends Combinati
}
}
- private void stompDisconnect() throws IOException {
+ protected void stompDisconnect() throws IOException {
if (stompConnection != null) {
stompConnection.close();
stompConnection = null;
@@ -351,8 +353,6 @@ public class StompTest extends Combinati
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
receiver.sendFrame(frame);
- waitForFrameToTakeEffect();
-
MessageConsumer consumer = session.createConsumer(queue);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
"receipt: msg-1\n" + "\n\n" + "Hello World" + Stomp.NULL;
@@ -362,7 +362,7 @@ public class StompTest extends Combinati
assertTrue(frame.startsWith("RECEIPT"));
assertTrue("Receipt contains correct receipt-id",
frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
- TextMessage message = (TextMessage)consumer.receive(2500);
+ TextMessage message = (TextMessage)consumer.receive(10000);
assertNotNull(message);
assertNull("JMS Message does not contain receipt request",
message.getStringProperty(Stomp.Headers.RECEIPT_REQUESTED));
@@ -388,7 +388,7 @@ public class StompTest extends Combinati
frame = sender.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
- frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n"
+ "receipt: " + (receiptId++) + "\n\n" + "Hello World:" + (count++) +
Stomp.NULL;
+ frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n"
+ "receipt: " + (receiptId++) + "\n" + "Hello World:" + (count++) + "\n\n" +
Stomp.NULL;
sender.sendFrame(frame);
frame = sender.receiveFrame();
assertTrue("" + frame, frame.startsWith("RECEIPT"));
@@ -586,7 +586,7 @@ public class StompTest extends Combinati
}
// sleep a while before publishing another set of messages
- waitForFrameToTakeEffect();
+ TimeUnit.SECONDS.sleep(2);
for (int i = 0; i < ctr; ++i) {
data[i] = getName() + ":second:" + i;
@@ -729,7 +729,7 @@ public class StompTest extends Combinati
assertTrue(message.getJMSRedelivered());
}
- public void testSubscribeWithClientAckAndContentLength() throws Exception {
+ public void testSubscribeWithClientAckedAndContentLength() throws
Exception {
String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@@ -753,8 +753,14 @@ public class StompTest extends Combinati
StompFrame ack = new StompFrame("ACK", ackHeaders);
stompConnection.sendFrame(ack.format());
- // Need some time for the Ack to get processed.
- waitForFrameToTakeEffect();
+ final QueueViewMBean queueView = getProxyToQueue(getQueueName());
+ assertTrue("dequeue complete", Wait.waitFor(new Wait.Condition(){
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("queueView, enqueue:" + queueView.getEnqueueCount()
+", dequeue:" + queueView.getDequeueCount() + ", inflight:" +
queueView.getInFlightCount());
+ return queueView.getDequeueCount() == 1;
+ }
+ }));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@@ -785,10 +791,11 @@ public class StompTest extends Combinati
assertTrue(frame.startsWith("MESSAGE"));
// remove suscription
- frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() +
"\n" + "\n\n" + Stomp.NULL;
+ frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() +
"\n" + "receipt:1" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
- waitForFrameToTakeEffect();
+ frame = stompConnection.receiveFrame();
+ assertTrue("" + frame, frame.startsWith("RECEIPT"));
// send a message to our queue
sendMessage("second message");
@@ -819,9 +826,7 @@ public class StompTest extends Combinati
frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
- waitForFrameToTakeEffect();
-
- TextMessage message = (TextMessage)consumer.receive(2500);
+ TextMessage message = (TextMessage)consumer.receive(10000);
assertNotNull("Should have received a message", message);
}
@@ -853,11 +858,8 @@ public class StompTest extends Combinati
frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
- // This test case is currently failing
- waitForFrameToTakeEffect();
-
// only second msg should be received since first msg was rolled back
- TextMessage message = (TextMessage)consumer.receive(2500);
+ TextMessage message = (TextMessage)consumer.receive(10000);
assertNotNull(message);
assertEquals("second message", message.getText().trim());
}
@@ -868,16 +870,11 @@ public class StompTest extends Combinati
stompConnection.sendFrame(frame);
- // This test case is currently failing
- waitForFrameToTakeEffect();
-
assertClients(2);
// now lets kill the stomp connection
stompConnection.close();
- Thread.sleep(2000);
-
assertClients(1);
}
@@ -1486,8 +1483,6 @@ public class StompTest extends Combinati
stompConnection.ack(frame5, "tx3");
stompConnection.commit("tx3");
- waitForFrameToTakeEffect();
-
stompDisconnect();
}
@@ -1725,9 +1720,13 @@ public class StompTest extends Combinati
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
- waitForFrameToTakeEffect();
-
- QueueViewMBean queueView = getProxyToQueue(getQueueName());
+ final QueueViewMBean queueView = getProxyToQueue(getQueueName());
+ Wait.waitFor(new Wait.Condition(){
+ @Override
+ public boolean isSatisified() throws Exception {
+ return queueView.getDequeueCount() == 2;
+ }
+ });
assertEquals(2, queueView.getDispatchCount());
assertEquals(2, queueView.getDequeueCount());
assertEquals(0, queueView.getQueueSize());
@@ -1896,7 +1895,14 @@ public class StompTest extends Combinati
return proxy;
}
- protected void assertClients(int expected) throws Exception {
+ protected void assertClients(final int expected) throws Exception {
+ Wait.waitFor(new Wait.Condition()
+ {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return broker.getBroker().getClients().length == expected;
+ }
+ });
org.apache.activemq.broker.Connection[] clients =
broker.getBroker().getClients();
int actual = clients.length;
@@ -1936,8 +1942,6 @@ public class StompTest extends Combinati
frame = "SUBSCRIBE\n" + "destination:/queue/test.DEV-3485\n" +
"ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
- waitForFrameToTakeEffect();
-
stompConnection.sendFrame(test);
// We only want one of them, to trigger the shutdown and potentially
@@ -1959,15 +1963,6 @@ public class StompTest extends Combinati
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
- waitForFrameToTakeEffect();
-
stompConnection.close();
}
-
- protected void waitForFrameToTakeEffect() throws InterruptedException {
- // bit of a dirty hack :)
- // another option would be to force some kind of receipt to be returned
- // from the frame
- Thread.sleep(2000);
- }
}