Author: dejanb
Date: Tue Dec 16 03:02:28 2008
New Revision: 727017
URL: http://svn.apache.org/viewvc?rev=727017&view=rev
Log:
fix for prefetch size issue reported in AMQ-1807
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=727017&r1=727016&r2=727017&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Dec 16 03:02:28 2008
@@ -243,11 +243,11 @@
// consumer
if (getPrefetchSize() != 0) {
prefetchExtension = Math.max(
- prefetchExtension, index + 1);
+ prefetchExtension, index );
}
} else {
prefetchExtension = Math.max(0,
- prefetchExtension - (index + 1));
+ prefetchExtension - index);
}
destination = node.getRegionDestination();
callDispatchMatched = true;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=727017&r1=727016&r2=727017&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
Tue Dec 16 03:02:28 2008
@@ -112,6 +112,11 @@
headers.put("passcode", password);
StompFrame frame = new StompFrame("CONNECT", headers);
sendFrame(frame.toString());
+
+ StompFrame connect = receive();
+ if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
+ throw new Exception ("Not connected: " + connect.getBody());
+ }
}
public void disconnect() throws Exception {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=727017&r1=727016&r2=727017&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Tue Dec 16 03:02:28 2008
@@ -884,7 +884,7 @@
stompConnection.sendFrame(frame);
// wait a bit for MBean to get refreshed
try {
- Thread.sleep(100);
+ Thread.sleep(200);
} catch (InterruptedException e){}
assertEquals(view.getDurableTopicSubscribers().length, 1);
@@ -892,7 +892,7 @@
frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
try {
- Thread.sleep(100);
+ Thread.sleep(200);
} catch (InterruptedException e){}
//reconnect
@@ -920,17 +920,41 @@
stompConnection.begin("tx1");
stompConnection.send("/queue/" + getQueueName(), "msg", "tx1", null);
stompConnection.commit("tx1");
-
- StompFrame connect = stompConnection.receive();
- if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
- throw new Exception ("Not connected");
- }
stompConnection.subscribe("/queue/" + getQueueName());
StompFrame stompMessage = stompConnection.receive();
assertNull(stompMessage.getHeaders().get("transaction"));
}
+ public void testPrefetchSize() throws Exception {
+ stompConnection.connect("system", "manager");
+
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put("activemq.prefetchSize", "1");
+ stompConnection.subscribe("/queue/" + getQueueName(), "client",
headers);
+
+ // send messages using JMS
+ sendMessage("message 1");
+ sendMessage("message 2");
+ sendMessage("message 3");
+
+ StompFrame frame = stompConnection.receive();
+
+ stompConnection.begin("tx1");
+ stompConnection.ack(frame, "tx1");
+
+ StompFrame frame1 = stompConnection.receive();
+
+ try {
+ StompFrame frame2 = stompConnection.receive(500);
+ if (frame2 != null) {
+ fail("Should not have received the second message");
+ }
+ } catch (SocketTimeoutException soe) {}
+ stompDisconnect();
+
+ }
+
protected void assertClients(int expected) throws Exception {
org.apache.activemq.broker.Connection[] clients =
broker.getBroker().getClients();
int actual = clients.length;