Author: chirino
Date: Fri Jun 30 10:24:01 2006
New Revision: 418336
URL: http://svn.apache.org/viewvc?rev=418336&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQ-786
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java?rev=418336&r1=418335&r2=418336&view=diff
==============================================================================
---
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
(original)
+++
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
Fri Jun 30 10:24:01 2006
@@ -20,11 +20,14 @@
import java.io.InterruptedIOException;
import org.apache.activemq.command.Response;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
public class FutureResponse {
+ private static final Log log = LogFactory.getLog(FutureResponse.class);
private final ResponseCallback responseCallback;
private final ArrayBlockingQueue responseSlot = new ArrayBlockingQueue(1);
@@ -36,7 +39,12 @@
public Response getResult() throws IOException {
try {
return (Response) responseSlot.take();
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (log.isDebugEnabled()) {
+ log.debug("Operation interupted: " + e, e);
+ }
throw new InterruptedIOException("Interrupted.");
}
}
@@ -49,14 +57,11 @@
}
}
- public void set(Response result) throws InterruptedIOException {
- try {
- responseSlot.put(result);
- } catch (InterruptedException e) {
- throw new InterruptedIOException("Interrupted.");
+ public void set(Response result) {
+ if( responseSlot.offer(result) ) {
+ if( responseCallback !=null ) {
+ responseCallback.onCompletion(this);
+ }
}
- if( responseCallback !=null ) {
- responseCallback.onCompletion(this);
- }
}
}
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=418336&r1=418335&r2=418336&view=diff
==============================================================================
---
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
(original)
+++
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
Fri Jun 30 10:24:01 2006
@@ -17,9 +17,7 @@
package org.apache.activemq.transport;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import org.apache.activemq.command.Command;
@@ -82,16 +80,12 @@
public void onCommand(Command command) {
boolean debug = log.isDebugEnabled();
if( command.isResponse() ) {
- try {
- Response response = (Response) command;
- FutureResponse future = (FutureResponse) requestMap.remove(new
Integer(response.getCorrelationId()));
- if( future!=null ) {
- future.set(response);
- } else {
- if( debug ) log.debug("Received unexpected response for
command id: "+response.getCorrelationId());
- }
- } catch (InterruptedIOException e) {
- onException(e);
+ Response response = (Response) command;
+ FutureResponse future = (FutureResponse) requestMap.remove(new
Integer(response.getCorrelationId()));
+ if( future!=null ) {
+ future.set(response);
+ } else {
+ if( debug ) log.debug("Received unexpected response for
command id: "+response.getCorrelationId());
}
} else {
getTransportListener().onCommand(command);
@@ -109,12 +103,8 @@
requestMap.clear();
for (Iterator iter = requests.iterator(); iter.hasNext();) {
- try {
- FutureResponse fr = (FutureResponse) iter.next();
- fr.set(new ExceptionResponse(error));
- } catch (InterruptedIOException e) {
- Thread.currentThread().interrupt();
- }
+ FutureResponse fr = (FutureResponse) iter.next();
+ fr.set(new ExceptionResponse(error));
}
super.onException(error);