Author: gtully
Date: Fri Nov 13 17:00:14 2009
New Revision: 835920
URL: http://svn.apache.org/viewvc?rev=835920&view=rev
Log:
merge -c 835715 Fix for http://issues.apache.org/activemq/browse/AMQ-2439:
IllegalState issue.
Tightening synchronization in DemandForwardingBridge to avoid subscription
clean up and ack processing race condition.
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=835920&r1=835919&r2=835920&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Fri Nov 13 17:00:14 2009
@@ -89,7 +89,7 @@
* @version $Revision$
*/
public abstract class DemandForwardingBridgeSupport implements NetworkBridge,
BrokerServiceAware {
-
+
private static final Log LOG =
LogFactory.getLog(DemandForwardingBridge.class);
private static final ThreadPoolExecutor ASYNC_TASKS;
protected final Transport localBroker;
@@ -114,7 +114,7 @@
protected ActiveMQDestination[] durableDestinations;
protected final ConcurrentHashMap<ConsumerId, DemandSubscription>
subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId,
DemandSubscription>();
protected final ConcurrentHashMap<ConsumerId, DemandSubscription>
subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId,
DemandSubscription>();
- protected final BrokerId localBrokerPath[] = new BrokerId[] {null};
+ protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
protected CountDownLatch startedLatch = new CountDownLatch(2);
protected CountDownLatch localStartedLatch = new CountDownLatch(1);
protected CountDownLatch remoteBrokerNameKnownLatch = new
CountDownLatch(1);
@@ -125,12 +125,11 @@
final AtomicLong enqueueCounter = new AtomicLong();
final AtomicLong dequeueCounter = new AtomicLong();
-
+
private NetworkBridgeListener networkBridgeListener;
private boolean createdByDuplex;
private BrokerInfo localBrokerInfo;
private BrokerInfo remoteBrokerInfo;
-
private AtomicBoolean started = new AtomicBoolean();
private TransportConnection duplexInitiatingConnection;
@@ -155,7 +154,7 @@
localBroker.setTransportListener(new DefaultTransportListener() {
public void onCommand(Object o) {
- Command command = (Command)o;
+ Command command = (Command) o;
serviceLocalCommand(command);
}
@@ -166,7 +165,7 @@
remoteBroker.setTransportListener(new TransportListener() {
public void onCommand(Object o) {
- Command command = (Command)o;
+ Command command = (Command) o;
serviceRemoteCommand(command);
}
@@ -290,10 +289,10 @@
localConnectionInfo.setPassword(configuration.getPassword());
Transport originalTransport = remoteBroker;
while (originalTransport instanceof TransportFilter) {
- originalTransport =
((TransportFilter)originalTransport).getNext();
+ originalTransport = ((TransportFilter)
originalTransport).getNext();
}
if (originalTransport instanceof SslTransport) {
- X509Certificate[] peerCerts =
((SslTransport)originalTransport).getPeerCertificates();
+ X509Certificate[] peerCerts = ((SslTransport)
originalTransport).getPeerCertificates();
localConnectionInfo.setTransportContext(peerCerts);
}
localBroker.oneway(localConnectionInfo);
@@ -385,10 +384,10 @@
} finally {
sendShutdown.countDown();
}
-
+
}
});
- if( !sendShutdown.await(10, TimeUnit.SECONDS) ) {
+ if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
LOG.info("Network Could not shutdown in a timely
manner");
}
} finally {
@@ -424,12 +423,12 @@
}
}
- protected void serviceRemoteCommand(Command command) {
+ protected void serviceRemoteCommand(Command command) {
if (!disposed.get()) {
try {
if (command.isMessageDispatch()) {
waitStarted();
- MessageDispatch md = (MessageDispatch)command;
+ MessageDispatch md = (MessageDispatch) command;
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
demandConsumerDispatched++;
if (demandConsumerDispatched >
(demandConsumerInfo.getPrefetchSize() * .75)) {
@@ -438,7 +437,7 @@
}
} else if (command.isBrokerInfo()) {
lastConnectSucceeded.set(true);
- remoteBrokerInfo = (BrokerInfo)command;
+ remoteBrokerInfo = (BrokerInfo) command;
Properties props =
MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
try {
IntrospectionSupport.getProperties(configuration,
props, null);
@@ -463,18 +462,18 @@
// Let the local broker know the remote broker's ID.
localBroker.oneway(command);
} else if (command.getClass() == ConnectionError.class) {
- ConnectionError ce = (ConnectionError)command;
+ ConnectionError ce = (ConnectionError) command;
serviceRemoteException(ce.getException());
} else {
if (isDuplex()) {
if (command.isMessage()) {
- ActiveMQMessage message = (ActiveMQMessage)command;
+ ActiveMQMessage message = (ActiveMQMessage)
command;
if
(AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())) {
serviceRemoteConsumerAdvisory(message.getDataStructure());
} else {
- if
(!isPermissableDestination(message.getDestination(), true)) {
- return;
- }
+ if
(!isPermissableDestination(message.getDestination(), true)) {
+ return;
+ }
if (message.isResponseRequired()) {
Response reply = new Response();
reply.setCorrelationId(message.getCommandId());
@@ -492,20 +491,20 @@
localBroker.oneway(command);
break;
case ConsumerInfo.DATA_STRUCTURE_TYPE:
- localStartedLatch.await();
+ localStartedLatch.await();
if (started.get()) {
if (!addConsumerInfo((ConsumerInfo)
command)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring ConsumerInfo:
"+ command);
+ LOG.debug("Ignoring ConsumerInfo:
" + command);
}
} else {
if (LOG.isTraceEnabled()) {
- LOG.trace("Adding ConsumerInfo: "+
command);
+ LOG.trace("Adding ConsumerInfo: "
+ command);
}
}
} else {
// received a subscription whilst stopping
- LOG.warn("Stopping - ignoring
ConsumerInfo: "+ command);
+ LOG.warn("Stopping - ignoring
ConsumerInfo: " + command);
}
break;
default:
@@ -538,9 +537,9 @@
final int networkTTL = configuration.getNetworkTTL();
if (data.getClass() == ConsumerInfo.class) {
// Create a new local subscription
- ConsumerInfo info = (ConsumerInfo)data;
+ ConsumerInfo info = (ConsumerInfo) data;
BrokerId[] path = info.getBrokerPath();
-
+
if (path != null && path.length >= networkTTL) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring sub
from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops
only : " + info);
@@ -553,7 +552,7 @@
LOG.debug(configuration.getBrokerName() + " Ignoring sub
from " + remoteBrokerName + ", already routed through this broker once : " +
info);
}
return;
- }
+ }
if (!isPermissableDestination(info.getDestination())) {
// ignore if not in the permitted or in the excluded list
if (LOG.isDebugEnabled()) {
@@ -561,10 +560,10 @@
}
return;
}
-
+
// in a cyclic network there can be multiple bridges per broker
that can propagate
// a network subscription so there is a need to synchronise on a
shared entity
- synchronized(brokerService.getVmConnectorURI()) {
+ synchronized (brokerService.getVmConnectorURI()) {
if (addConsumerInfo(info)) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " bridging
sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
@@ -578,7 +577,7 @@
} else if (data.getClass() == DestinationInfo.class) {
// It's a destination info - we want to pass up
// information about temporary destinations
- DestinationInfo destInfo = (DestinationInfo)data;
+ DestinationInfo destInfo = (DestinationInfo) data;
BrokerId[] path = destInfo.getBrokerPath();
if (path != null && path.length >= networkTTL) {
if (LOG.isDebugEnabled()) {
@@ -597,7 +596,7 @@
destInfo.setConnectionId(localConnectionInfo.getConnectionId());
if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
// re-set connection id so comes from here
- ActiveMQTempDestination tempDest =
(ActiveMQTempDestination)destInfo.getDestination();
+ ActiveMQTempDestination tempDest = (ActiveMQTempDestination)
destInfo.getDestination();
tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
}
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),
getRemoteBrokerPath()));
@@ -606,7 +605,7 @@
}
localBroker.oneway(destInfo);
} else if (data.getClass() == RemoveInfo.class) {
- ConsumerId id = (ConsumerId)((RemoveInfo)data).getObjectId();
+ ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
removeDemandSubscription(id);
}
}
@@ -640,7 +639,7 @@
LOG.debug(configuration.getBrokerName() + " remove local
subscription for remote " + sub.getRemoteInfo().getConsumerId());
}
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
-
+
// continue removal in separate thread to free up this thread for
outstanding responses
ASYNC_TASKS.execute(new Runnable() {
public void run() {
@@ -673,41 +672,46 @@
try {
if (command.isMessageDispatch()) {
enqueueCounter.incrementAndGet();
- final MessageDispatch md = (MessageDispatch)command;
+ final MessageDispatch md = (MessageDispatch) command;
final DemandSubscription sub =
subscriptionMapByLocalId.get(md.getConsumerId());
- if (sub != null && md.getMessage()!=null) {
+ if (sub != null && md.getMessage() != null &&
sub.incrementOutstandingResponses()) {
// See if this consumer's brokerPath tells us it came
from the broker at the other end
// of the bridge. I think we should be making this
decision based on the message's
// broker bread crumbs and not the consumer's?
However, the message's broker bread
// crumbs are null, which is another matter.
boolean cameFromRemote = false;
- Object consumerInfo =
md.getMessage().getDataStructure();
- if( consumerInfo != null && (consumerInfo instanceof
ConsumerInfo) )
- cameFromRemote = contains(
((ConsumerInfo)consumerInfo).getBrokerPath(),remoteBrokerInfo.getBrokerId());
-
+ Object consumerInfo =
md.getMessage().getDataStructure();
+ if (consumerInfo != null && (consumerInfo instanceof
ConsumerInfo))
+ cameFromRemote = contains(((ConsumerInfo)
consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
+
Message message = configureMessage(md);
if (LOG.isDebugEnabled()) {
LOG.debug("bridging " +
configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message);
}
-
+
if (!message.isResponseRequired()) {
// If the message was originally sent using async
// send, we will preserve that QOS
// by bridging it using an async send (small chance
// of message loss).
-
- // Don't send it off to the remote if it
originally came from the remote.
- if( !cameFromRemote ) {
- remoteBroker.oneway(message);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Message not forwarded on to
remote, because message came from remote");
+
+ try {
+ // Don't send it off to the remote if it
originally came from the remote.
+ if (!cameFromRemote) {
+ remoteBroker.oneway(message);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Message not forwarded on to
remote, because message came from remote");
+ }
}
+
+ localBroker.oneway(new MessageAck(md,
MessageAck.INDIVIDUAL_ACK_TYPE, 1));
+ dequeueCounter.incrementAndGet();
+ } finally {
+ sub.decrementOutstandingResponses();
}
- localBroker.oneway(new MessageAck(md,
MessageAck.INDIVIDUAL_ACK_TYPE, 1));
- dequeueCounter.incrementAndGet();
-
+
} else {
// The message was not sent using async send, so we
@@ -719,12 +723,12 @@
try {
Response response = future.getResult();
if (response.isException()) {
- ExceptionResponse er =
(ExceptionResponse)response;
+ ExceptionResponse er =
(ExceptionResponse) response;
serviceLocalException(er.getException());
} else {
localBroker.oneway(new
MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
dequeueCounter.incrementAndGet();
-
+
}
} catch (IOException e) {
serviceLocalException(e);
@@ -735,16 +739,15 @@
};
remoteBroker.asyncRequest(message, callback);
- sub.incrementOutstandingResponses();
}
-
+
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("No subscription registered with this
network bridge for consumerId " + md.getConsumerId() + " for message: " +
md.getMessage());
}
}
} else if (command.isBrokerInfo()) {
- localBrokerInfo = (BrokerInfo)command;
+ localBrokerInfo = (BrokerInfo) command;
serviceLocalBrokerInfo(command);
} else if (command.isShutdownInfo()) {
LOG.info(configuration.getBrokerName() + " Shutting down");
@@ -757,7 +760,7 @@
stop();
}
} else if (command.getClass() == ConnectionError.class) {
- ConnectionError ce = (ConnectionError)command;
+ ConnectionError ce = (ConnectionError) command;
serviceLocalException(ce.getException());
} else {
switch (command.getDataStructureType()) {
@@ -768,7 +771,7 @@
}
}
} catch (Throwable e) {
- LOG.warn("Caught an exception processing local command",e);
+ LOG.warn("Caught an exception processing local command", e);
serviceLocalException(e);
}
}
@@ -783,7 +786,7 @@
/**
* @param dynamicallyIncludedDestinations The
- * dynamicallyIncludedDestinations to set.
+ * dynamicallyIncludedDestinations to set.
*/
public void setDynamicallyIncludedDestinations(ActiveMQDestination[]
dynamicallyIncludedDestinations) {
this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
@@ -812,7 +815,7 @@
/**
* @param staticallyIncludedDestinations The staticallyIncludedDestinations
- * to set.
+ * to set.
*/
public void setStaticallyIncludedDestinations(ActiveMQDestination[]
staticallyIncludedDestinations) {
this.staticallyIncludedDestinations = staticallyIncludedDestinations;
@@ -883,30 +886,30 @@
protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId
idToAppend) {
if (brokerPath == null || brokerPath.length == 0) {
- return new BrokerId[] {idToAppend};
+ return new BrokerId[] { idToAppend };
}
BrokerId rc[] = new BrokerId[brokerPath.length + 1];
System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
rc[brokerPath.length] = idToAppend;
return rc;
}
-
+
protected boolean isPermissableDestination(ActiveMQDestination
destination) {
- return isPermissableDestination(destination, false);
+ return isPermissableDestination(destination, false);
}
protected boolean isPermissableDestination(ActiveMQDestination
destination, boolean allowTemporary) {
// Are we not bridging temp destinations?
if (destination.isTemporary()) {
- if (allowTemporary) {
- return true;
- } else {
- return configuration.isBridgeTempDestinations();
- }
- }
+ if (allowTemporary) {
+ return true;
+ } else {
+ return configuration.isBridgeTempDestinations();
+ }
+ }
final DestinationFilter filter =
DestinationFilter.parseFilter(destination);
-
+
ActiveMQDestination[] dests = excludedDestinations;
if (dests != null && dests.length > 0) {
for (int i = 0; i < dests.length; i++) {
@@ -972,7 +975,7 @@
addRemoteBrokerToBrokerPath(info);
DemandSubscription sub = createDemandSubscription(info);
if (sub != null) {
- if (duplicateSuppressionIsRequired(sub) ) {
+ if (duplicateSuppressionIsRequired(sub)) {
undoMapRegistration(sub);
} else {
addSubscription(sub);
@@ -981,10 +984,10 @@
}
return consumerAdded;
}
-
+
private void undoMapRegistration(DemandSubscription sub) {
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
- subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
+ subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
}
/*
@@ -994,16 +997,16 @@
private boolean duplicateSuppressionIsRequired(DemandSubscription
candidate) {
final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
boolean suppress = false;
-
+
if (consumerInfo.getDestination().isQueue() &&
!configuration.isSuppressDuplicateQueueSubscriptions()) {
return suppress;
}
-
- List<ConsumerId> candidateConsumers =
consumerInfo.getNetworkConsumerIds();
+
+ List<ConsumerId> candidateConsumers =
consumerInfo.getNetworkConsumerIds();
Collection<Subscription> currentSubs =
getRegionSubscriptions(consumerInfo.getDestination().isTopic());
for (Subscription sub : currentSubs) {
- List<ConsumerId> networkConsumers =
sub.getConsumerInfo().getNetworkConsumerIds();
+ List<ConsumerId> networkConsumers =
sub.getConsumerInfo().getNetworkConsumerIds();
if (!networkConsumers.isEmpty()) {
if (matchFound(candidateConsumers, networkConsumers)) {
suppress = hasLowerPriority(sub, candidate.getLocalInfo());
@@ -1014,10 +1017,9 @@
return suppress;
}
-
private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo
candidateInfo) {
boolean suppress = false;
-
+
if (existingSub.getConsumerInfo().getPriority() >=
candidateInfo.getPriority()) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring duplicate
subscription from " + remoteBrokerName
@@ -1029,7 +1031,7 @@
// remove the existing lower priority duplicate and allow this
candidate
try {
removeDuplicateSubscription(existingSub);
-
+
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Replacing
duplicate subscription " + existingSub.getConsumerInfo()
+ " with sub from " + remoteBrokerName
@@ -1037,23 +1039,23 @@
+ candidateInfo.getNetworkConsumerIds());
}
} catch (IOException e) {
- LOG.error("Failed to remove duplicated sub as a result of sub
with higher priority, sub: "+ existingSub, e);
+ LOG.error("Failed to remove duplicated sub as a result of sub
with higher priority, sub: " + existingSub, e);
}
}
return suppress;
}
private void removeDuplicateSubscription(Subscription existingSub) throws
IOException {
- for (NetworkConnector connector: brokerService.getNetworkConnectors())
{
+ for (NetworkConnector connector :
brokerService.getNetworkConnectors()) {
if
(connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId()))
{
break;
}
- }
+ }
}
private boolean matchFound(List<ConsumerId> candidateConsumers,
List<ConsumerId> networkConsumers) {
boolean found = false;
- for (ConsumerId aliasConsumer : networkConsumers) {
+ for (ConsumerId aliasConsumer : networkConsumers) {
if (candidateConsumers.contains(aliasConsumer)) {
found = true;
break;
@@ -1068,21 +1070,20 @@
(isTopic ? region.getTopicRegion() : region.getQueueRegion());
return abstractRegion.getSubscriptions().values();
}
-
+
protected DemandSubscription createDemandSubscription(ConsumerInfo info)
throws IOException {
//add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId());
return doCreateDemandSubscription(info);
}
-
protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info)
throws IOException {
DemandSubscription result = new DemandSubscription(info);
result.getLocalInfo().setConsumerId(new
ConsumerId(localSessionInfo.getSessionId(),
consumerIdGenerator.getNextSequenceId()));
if (info.getDestination().isTemporary()) {
// reset the local connection Id
- ActiveMQTempDestination dest =
(ActiveMQTempDestination)result.getLocalInfo().getDestination();
+ ActiveMQTempDestination dest = (ActiveMQTempDestination)
result.getLocalInfo().getDestination();
dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
}
@@ -1090,7 +1091,7 @@
byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
if (info.getBrokerPath() != null && info.getBrokerPath().length >
1) {
// The longer the path to the consumer, the less it's consumer
priority.
- priority -= info.getBrokerPath().length + 1;
+ priority -= info.getBrokerPath().length + 1;
}
result.getLocalInfo().setPriority(priority);
if (LOG.isDebugEnabled()) {
@@ -1101,19 +1102,19 @@
return result;
}
- final protected DemandSubscription
createDemandSubscription(ActiveMQDestination destination){
+ final protected DemandSubscription
createDemandSubscription(ActiveMQDestination destination) {
ConsumerInfo info = new ConsumerInfo();
info.setDestination(destination);
// the remote info held by the DemandSubscription holds the original
// consumerId,
// the local info get's overwritten
-
+
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
consumerIdGenerator.getNextSequenceId()));
DemandSubscription result = null;
try {
result = createDemandSubscription(info);
} catch (IOException e) {
- LOG.error("Failed to create DemandSubscription ",e);
+ LOG.error("Failed to create DemandSubscription ", e);
}
if (result != null) {
result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
@@ -1132,7 +1133,6 @@
sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
}
-
protected void removeDemandSubscription(ConsumerId id) throws IOException {
DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
if (LOG.isDebugEnabled()) {
@@ -1145,17 +1145,17 @@
}
}
}
-
+
protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId)
{
boolean removeDone = false;
DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
if (sub != null) {
try {
removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
- removeDone = true;
+ removeDone = true;
} catch (IOException e) {
LOG.debug("removeDemandSubscriptionByLocalId failed for
localId: " + consumerId, e);
- }
+ }
}
return removeDone;
}
@@ -1177,7 +1177,6 @@
protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info)
throws IOException;
protected abstract void serviceRemoteBrokerInfo(Command command) throws
IOException;
-
protected abstract BrokerId[] getRemoteBrokerPath();
@@ -1215,17 +1214,17 @@
public long getEnqueueCounter() {
return enqueueCounter.get();
}
-
+
protected boolean isDuplex() {
return configuration.isDuplex() || createdByDuplex;
}
-
+
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
-
+
static {
- ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+ ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "NetworkBridge");
thread.setDaemon(true);
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=835920&r1=835919&r2=835920&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
Fri Nov 13 17:00:14 2009
@@ -33,10 +33,11 @@
*/
public class DemandSubscription {
private static final Log LOG = LogFactory.getLog(DemandSubscription.class);
-
+
private final ConsumerInfo remoteInfo;
private final ConsumerInfo localInfo;
private Set<ConsumerId> remoteSubsIds = new
CopyOnWriteArraySet<ConsumerId>();
+
private AtomicInteger dispatched = new AtomicInteger(0);
private AtomicBoolean activeWaiter = new AtomicBoolean();
@@ -44,7 +45,7 @@
remoteInfo = info;
localInfo = info.copy();
localInfo.setNetworkSubscription(true);
- remoteSubsIds.add(info.getConsumerId());
+ remoteSubsIds.add(info.getConsumerId());
}
/**
@@ -81,7 +82,6 @@
return localInfo;
}
-
/**
* @return Returns the remoteInfo.
*/
@@ -111,13 +111,18 @@
public void decrementOutstandingResponses() {
if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) {
- synchronized(activeWaiter) {
+ synchronized (activeWaiter) {
activeWaiter.notifyAll();
}
}
}
- public void incrementOutstandingResponses() {
- dispatched.incrementAndGet();
+ public boolean incrementOutstandingResponses() {
+ dispatched.incrementAndGet();
+ if (activeWaiter.get()) {
+ decrementOutstandingResponses();
+ return false;
+ }
+ return true;
}
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=835920&r1=835919&r2=835920&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
Fri Nov 13 17:00:14 2009
@@ -61,16 +61,23 @@
}
public FutureResponse asyncRequest(Object o, ResponseCallback
responseCallback) throws IOException {
- Command command = (Command)o;
+ Command command = (Command) o;
command.setCommandId(sequenceGenerator.getNextSequenceId());
command.setResponseRequired(true);
FutureResponse future = new FutureResponse(responseCallback);
+ IOException priorError = null;
synchronized (requestMap) {
- if( this.error !=null ) {
- throw error;
+ priorError = this.error;
+ if (priorError == null) {
+ requestMap.put(new Integer(command.getCommandId()), future);
}
- requestMap.put(new Integer(command.getCommandId()), future);
}
+
+ if (priorError != null) {
+ future.set(new ExceptionResponse(priorError));
+ throw priorError;
+ }
+
next.oneway(command);
return future;
}