Author: gtully
Date: Fri Aug 13 13:55:32 2010
New Revision: 985201

URL: http://svn.apache.org/viewvc?rev=985201&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2867, addition to 
https://issues.apache.org/activemq/browse/AMQ-2507 - if no response received 
and timeout > 0 a RequestTimedOutIOException is thrown on the client side

Added:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
   (with props)
Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java?rev=985201&r1=985200&r2=985201&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
 Fri Aug 13 13:55:32 2010
@@ -49,7 +49,11 @@ public class FutureResponse {
 
     public Response getResult(int timeout) throws IOException {
         try {
-            return responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
+            Response result = responseSlot.poll(timeout, 
TimeUnit.MILLISECONDS);
+            if (result == null && timeout > 0) {
+                throw new RequestTimedOutIOException();
+            }
+            return result;
         } catch (InterruptedException e) {
             throw new InterruptedIOException("Interrupted.");
         }

Added: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java?rev=985201&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
 (added)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
 Fri Aug 13 13:55:32 2010
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+
+/**
+ * thrown when the timeout specified on a request expires before
+ * a reply or response is received
+ */
+public class RequestTimedOutIOException extends IOException {
+}

Propchange: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java?rev=985201&r1=985200&r2=985201&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
 Fri Aug 13 13:55:32 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.bugs;
 
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.DeliveryMode;
@@ -29,6 +30,7 @@ import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.transport.RequestTimedOutIOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -50,7 +52,7 @@ public class JmsTimeoutTest extends Embe
                final ActiveMQConnection cx = 
(ActiveMQConnection)createConnection();
                final ActiveMQDestination queue = 
createDestination("testqueue");
                
-               // we should not take longer than 5 seconds to return from send
+               // we should not take longer than 10 seconds to return from send
                cx.setSendTimeout(10000);
                        
                Runnable r = new Runnable() {
@@ -64,18 +66,61 @@ public class JmsTimeoutTest extends Embe
                            TextMessage message = 
session.createTextMessage(createMessageText());
                            for(int count=0; count<messageCount; count++){
                                producer.send(message);
-                               // Currently after the timeout producer just
-                               // returns but there is no way to know that
-                               // the send timed out
                            }     
                            LOG.info("Done sending..");
-                       } catch (JMSException e) {
-                           e.printStackTrace();
-                           if (e instanceof ResourceAllocationException) {
+                    } catch (JMSException e) {
+                        if (e.getCause() instanceof 
RequestTimedOutIOException) {
                                exceptionCount.incrementAndGet();
+                        } else {
+                            e.printStackTrace();
+                        }
+                           return;
+                       }
+
+                   }
+               };
+               cx.start();
+               Thread producerThread = new Thread(r);
+               producerThread.start();
+               producerThread.join(30000);
+               cx.close();
+               // We should have a few timeout exceptions as memory store will 
fill up
+               assertTrue("No exception from the broker", exceptionCount.get() 
> 0);
+           }
+
+
+        /**
+            * Test the case where the broker is blocked due to a memory limit
+            * with a fail timeout
+            * @throws Exception
+            */
+           public void testBlockedProducerUsageSendFailTimeout() throws 
Exception {
+               final ActiveMQConnection cx = 
(ActiveMQConnection)createConnection();
+               final ActiveMQDestination queue = 
createDestination("testqueue");
+
+            broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
+               Runnable r = new Runnable() {
+                   public void run() {
+                       try {
+                               LOG.info("Sender thread starting");
+                           Session session = cx.createSession(false, 1);
+                           MessageProducer producer = 
session.createProducer(queue);
+                           producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+                           TextMessage message = 
session.createTextMessage(createMessageText());
+                           for(int count=0; count<messageCount; count++){
+                               producer.send(message);
                            }
+                           LOG.info("Done sending..");
+                    } catch (JMSException e) {
+                        if (e instanceof ResourceAllocationException || 
e.getCause() instanceof RequestTimedOutIOException) {
+                               exceptionCount.incrementAndGet();
+                        } else {
+                            e.printStackTrace();
+                        }
                            return;
                        }
+
                    }
                };
                cx.start();
@@ -88,11 +133,12 @@ public class JmsTimeoutTest extends Embe
            }
 
            protected void setUp() throws Exception {
+            exceptionCount.set(0);
                bindAddress = "tcp://localhost:61616";
                broker = createBroker();
                broker.setDeleteAllMessagesOnStartup(true);
                broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
-               broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
+
                super.setUp();
            }
 


Reply via email to