Author: gtully
Date: Fri Aug 13 13:55:32 2010
New Revision: 985201
URL: http://svn.apache.org/viewvc?rev=985201&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2867, addition to
https://issues.apache.org/activemq/browse/AMQ-2507 - if no response received
and timeout > 0 a RequestTimedOutIOException is thrown on the client side
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java?rev=985201&r1=985200&r2=985201&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
Fri Aug 13 13:55:32 2010
@@ -49,7 +49,11 @@ public class FutureResponse {
public Response getResult(int timeout) throws IOException {
try {
- return responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
+ Response result = responseSlot.poll(timeout,
TimeUnit.MILLISECONDS);
+ if (result == null && timeout > 0) {
+ throw new RequestTimedOutIOException();
+ }
+ return result;
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted.");
}
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java?rev=985201&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
Fri Aug 13 13:55:32 2010
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+
+/**
+ * thrown when the timeout specified on a request expires before
+ * a reply or response is received
+ */
+public class RequestTimedOutIOException extends IOException {
+}
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java?rev=985201&r1=985200&r2=985201&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
Fri Aug 13 13:55:32 2010
@@ -17,6 +17,7 @@
package org.apache.activemq.bugs;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.DeliveryMode;
@@ -29,6 +30,7 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.transport.RequestTimedOutIOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -50,7 +52,7 @@ public class JmsTimeoutTest extends Embe
final ActiveMQConnection cx =
(ActiveMQConnection)createConnection();
final ActiveMQDestination queue =
createDestination("testqueue");
- // we should not take longer than 5 seconds to return from send
+ // we should not take longer than 10 seconds to return from send
cx.setSendTimeout(10000);
Runnable r = new Runnable() {
@@ -64,18 +66,61 @@ public class JmsTimeoutTest extends Embe
TextMessage message =
session.createTextMessage(createMessageText());
for(int count=0; count<messageCount; count++){
producer.send(message);
- // Currently after the timeout producer just
- // returns but there is no way to know that
- // the send timed out
}
LOG.info("Done sending..");
- } catch (JMSException e) {
- e.printStackTrace();
- if (e instanceof ResourceAllocationException) {
+ } catch (JMSException e) {
+ if (e.getCause() instanceof
RequestTimedOutIOException) {
exceptionCount.incrementAndGet();
+ } else {
+ e.printStackTrace();
+ }
+ return;
+ }
+
+ }
+ };
+ cx.start();
+ Thread producerThread = new Thread(r);
+ producerThread.start();
+ producerThread.join(30000);
+ cx.close();
+ // We should have a few timeout exceptions as memory store will
fill up
+ assertTrue("No exception from the broker", exceptionCount.get()
> 0);
+ }
+
+
+ /**
+ * Test the case where the broker is blocked due to a memory limit
+ * with a fail timeout
+ * @throws Exception
+ */
+ public void testBlockedProducerUsageSendFailTimeout() throws
Exception {
+ final ActiveMQConnection cx =
(ActiveMQConnection)createConnection();
+ final ActiveMQDestination queue =
createDestination("testqueue");
+
+ broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ LOG.info("Sender thread starting");
+ Session session = cx.createSession(false, 1);
+ MessageProducer producer =
session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ TextMessage message =
session.createTextMessage(createMessageText());
+ for(int count=0; count<messageCount; count++){
+ producer.send(message);
}
+ LOG.info("Done sending..");
+ } catch (JMSException e) {
+ if (e instanceof ResourceAllocationException ||
e.getCause() instanceof RequestTimedOutIOException) {
+ exceptionCount.incrementAndGet();
+ } else {
+ e.printStackTrace();
+ }
return;
}
+
}
};
cx.start();
@@ -88,11 +133,12 @@ public class JmsTimeoutTest extends Embe
}
protected void setUp() throws Exception {
+ exceptionCount.set(0);
bindAddress = "tcp://localhost:61616";
broker = createBroker();
broker.setDeleteAllMessagesOnStartup(true);
broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
- broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
+
super.setUp();
}