Author: aco
Date: Wed Mar 8 19:59:03 2006
New Revision: 384419
URL: http://svn.apache.org/viewcvs?rev=384419&view=rev
Log:
Use offer to wait (with timeout of 30secs) for space in case the queue is full.
Modified:
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
Modified:
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java?rev=384419&r1=384418&r2=384419&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
(original)
+++
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
Wed Mar 8 19:59:03 2006
@@ -18,6 +18,7 @@
import edu.emory.mathcs.backport.java.util.Queue;
import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.TransportSupport;
@@ -35,6 +36,8 @@
* @version $Revision$
*/
public class BlockingQueueTransport extends TransportSupport {
+ public static final long MAX_TIMEOUT = 30000L;
+
private BlockingQueue queue;
public BlockingQueueTransport(BlockingQueue channel) {
@@ -46,7 +49,13 @@
}
public void oneway(Command command) throws IOException {
- queue.add(command);
+ try {
+ boolean success = queue.offer(command, MAX_TIMEOUT,
TimeUnit.MILLISECONDS);
+ if (!success)
+ throw new IOException("Fail to add to BlockingQueue. Add timed
out after " + MAX_TIMEOUT + "ms: size=" + queue.size());
+ } catch (InterruptedException e) {
+ throw new IOException("Fail to add to BlockingQueue. Interrupted
while waiting for space: size=" + queue.size());
+ }
}
protected void doStart() throws Exception {