Author: rajdavies
Date: Sun Feb 19 14:14:33 2006
New Revision: 378968
URL: http://svn.apache.org/viewcvs?rev=378968&view=rev
Log:
fix some issues with assembly tests
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=378968&r1=378967&r2=378968&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
Sun Feb 19 14:14:33 2006
@@ -45,6 +45,14 @@
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info){
+
+ if (addToAlreadyInterestedConsumers(info)){
+ return null; //don't want this subscription added
+ }
+ return doCreateDemandSubscription(info);
+ }
+
+ protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info){
//search through existing subscriptions and see if we have a match
boolean matched = false;
DestinationFilter
filter=DestinationFilter.parseFilter(info.getDestination());
@@ -57,18 +65,7 @@
//continue - we want interest to any existing
DemandSubscriptions
}
}
- if (matched){
- return null; //don't want this subscription added
- }
- //not matched so create a new one
- //but first, if it's durable - changed set the
- //ConsumerId here - so it won't be removed if the
- //durable subscriber goes away on the other end
- if (info.isDurable() || (info.getDestination().isQueue() &&
!info.getDestination().isTemporary())){
- info.setConsumerId(new
ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
- .getNextSequenceId()));
- }
- return super.createDemandSubscription(info);
+ return matched;
}
protected void removeDemandSubscription(ConsumerId id) throws IOException{
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=378968&r1=378967&r2=378968&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
Sun Feb 19 14:14:33 2006
@@ -40,7 +40,6 @@
remoteInfo=info;
localInfo=info.copy();
localInfo.setBrokerPath(info.getBrokerPath());
- localInfo.setNetworkSubscription(true);
remoteSubsIds.add(info.getConsumerId());
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?rev=378968&r1=378967&r2=378968&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
Sun Feb 19 14:14:33 2006
@@ -1,28 +1,27 @@
/**
- *
+ *
* Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.network;
import java.io.IOException;
+import java.util.Iterator;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.transport.Transport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-
/**
* Consolidates subscriptions
*
@@ -30,29 +29,37 @@
*/
public class DurableConduitBridge extends ConduitBridge{
static final private Log log=LogFactory.getLog(DurableConduitBridge.class);
+
/**
* Constructor
+ *
* @param localBroker
* @param remoteBroker
*/
public DurableConduitBridge(Transport localBroker,Transport remoteBroker){
super(localBroker,remoteBroker);
}
-
+
/**
* Subscriptions for these desitnations are always created
- * @throws IOException
- *
+ *
*/
- protected void setupStaticDestinations() throws IOException{
+ protected void setupStaticDestinations(){
super.setupStaticDestinations();
ActiveMQDestination[] dests=durableDestinations;
if(dests!=null){
for(int i=0;i<dests.length;i++){
ActiveMQDestination dest=dests[i];
- if(isPermissableDestination(dest)){
+ if(isPermissableDestination(dest) && !doesConsumerExist(dest)){
DemandSubscription sub=createDemandSubscription(dest);
- addSubscription(sub);
+ if(dest.isTopic()){
+
sub.getLocalInfo().setSubcriptionName(getLocalBrokerName());
+ }
+ try{
+ addSubscription(sub);
+ }catch(IOException e){
+ log.error("Failed to add static destination "+dest,e);
+ }
if(log.isTraceEnabled())
log.trace("Forwarding messages for durable
destination: "+dest);
}
@@ -60,4 +67,32 @@
}
}
+ protected DemandSubscription createDemandSubscription(ConsumerInfo info){
+ if(addToAlreadyInterestedConsumers(info)){
+ return null; // don't want this subscription added
+ }
+ // not matched so create a new one
+ // but first, if it's durable - changed set the
+ // ConsumerId here - so it won't be removed if the
+ // durable subscriber goes away on the other end
+
if(info.isDurable()||(info.getDestination().isQueue()&&!info.getDestination().isTemporary())){
+ info.setConsumerId(new
ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator.getNextSequenceId()));
+ }
+ if(info.isDurable()){
+ // set the subscriber name to something reproducable
+ info.setSubcriptionName(getLocalBrokerName());
+ }
+ return doCreateDemandSubscription(info);
+ }
+
+ protected boolean doesConsumerExist(ActiveMQDestination dest){
+ DestinationFilter filter=DestinationFilter.parseFilter(dest);
+ for(Iterator
i=subscriptionMapByLocalId.values().iterator();i.hasNext();){
+ DemandSubscription ds=(DemandSubscription) i.next();
+ if(filter.matches(ds.getLocalInfo().getDestination())){
+ return true;
+ }
+ }
+ return false;
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=378968&r1=378967&r2=378968&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Sun Feb 19 14:14:33 2006
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Iterator;
import java.util.Set;
import org.apache.activemq.Service;
@@ -82,6 +83,10 @@
public void stop() throws Exception {
this.discoveryAgent.stop();
+ for (Iterator i = bridges.values().iterator();i.hasNext();){
+ Bridge bridge = (Bridge)i.next();
+ bridge.stop();
+ }
}
public void onServiceAdd(DiscoveryEvent event) {