Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java Wed Jan 18 11:16:58 2006 @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -40,9 +41,9 @@ private final PolicyMap policyMap; - public QueueRegion(DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, + public QueueRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter, PolicyMap policyMap) { - super(destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter); + super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter); this.policyMap = policyMap; } @@ -71,10 +72,10 @@ protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { if (info.isBrowser()) { - return new QueueBrowserSubscription(context, info); + return new QueueBrowserSubscription(broker,context, info); } else { - return new QueueSubscription(context, info); + return new QueueSubscription(broker,context, info); } }
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Wed Jan 18 11:16:58 2006 @@ -16,10 +16,7 @@ */ package org.apache.activemq.broker.region; -import java.io.IOException; - -import javax.jms.InvalidSelectorException; - +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.command.ConsumerId; @@ -27,12 +24,14 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.transaction.Synchronization; -import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import javax.jms.InvalidSelectorException; + +import java.io.IOException; public class QueueSubscription extends PrefetchSubscription { - public QueueSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { - super(context, info); + public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { + super(broker,context, info); } public void add(MessageReference node) throws Throwable { Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java Wed Jan 18 11:16:58 2006 @@ -22,6 +22,7 @@ import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.RemoveSubscriptionInfo; /** @@ -86,6 +87,13 @@ * @param context the environment the operation is being executed under. */ public void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable; + + /** + * Process a notification of a dispatch - used by a Slave Broker + * @param messageDispatchNotification + * @throws Throwable + */ + public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable; public void gc(); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Wed Jan 18 11:16:58 2006 @@ -20,15 +20,19 @@ import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.SessionInfo; @@ -46,8 +50,8 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.*; -import java.util.Set; +import java.util.HashMap; +import java.util.Map; /** * Routes Broker operations to the correct messaging regions for processing. @@ -62,22 +66,25 @@ private final Region topicRegion; private final Region tempQueueRegion; private final Region tempTopicRegion; + private BrokerService brokerService; protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); private final CopyOnWriteArraySet destinations = new CopyOnWriteArraySet(); + private final CopyOnWriteArrayList brokerInfos = new CopyOnWriteArrayList(); private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); private BrokerId brokerId; private String brokerName; private Map clientIdSet = new HashMap(); // we will synchronize access - public RegionBroker(TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException { - this(taskRunnerFactory, memoryManager, createDefaultPersistenceAdapter(memoryManager), null); + public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException { + this(brokerService,taskRunnerFactory, memoryManager, createDefaultPersistenceAdapter(memoryManager), null); } - public RegionBroker(TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, PolicyMap policyMap) throws IOException { + public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, PolicyMap policyMap) throws IOException { + this.brokerService = brokerService; this.sequenceGenerator.setLastSequenceId( adapter.getLastMessageBrokerSequenceId() ); queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, adapter, policyMap); @@ -86,21 +93,28 @@ tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory); tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory); } + + public Broker getAdaptor(Class type){ + if (type.isInstance(this)){ + return this; + } + return null; + } protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) { - return new TempTopicRegion(destinationStatistics, memoryManager, taskRunnerFactory); + return new TempTopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory); } protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) { - return new TempQueueRegion(destinationStatistics, memoryManager, taskRunnerFactory); + return new TempQueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory); } protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter, PolicyMap policyMap) { - return new TopicRegion(destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap); + return new TopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap); } protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter, PolicyMap policyMap) { - return new QueueRegion(destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap); + return new QueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap); } private static PersistenceAdapter createDefaultPersistenceAdapter(UsageManager memoryManager) throws IOException { @@ -279,7 +293,6 @@ } public void send(ConnectionContext context, Message message) throws Throwable { - message.getMessageId().setBrokerSequenceId(sequenceGenerator.getNextSequenceId()); ActiveMQDestination destination = message.getDestination(); switch(destination.getDestinationType()) { @@ -385,5 +398,51 @@ protected void throwUnknownDestinationType(ActiveMQDestination destination) throws JMSException { throw new JMSException("Unknown destination type: " + destination.getDestinationType()); } + + public synchronized void addBroker(Connection connection,BrokerInfo info){ + brokerInfos.add(info); + } + + public synchronized void removeBroker(Connection connection,BrokerInfo info){ + if (info != null){ + brokerInfos.remove(info); + } + } + + public synchronized BrokerInfo[] getPeerBrokerInfos(){ + BrokerInfo[] result = new BrokerInfo[brokerInfos.size()]; + result = (BrokerInfo[])brokerInfos.toArray(result); + return result; + } + + public void processDispatch(MessageDispatch messageDispatch){ + + } + + public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable { + ActiveMQDestination destination = messageDispatchNotification.getDestination(); + switch(destination.getDestinationType()) { + case ActiveMQDestination.QUEUE_TYPE: + queueRegion.processDispatchNotification(messageDispatchNotification); + break; + case ActiveMQDestination.TOPIC_TYPE: + topicRegion.processDispatchNotification(messageDispatchNotification); + break; + case ActiveMQDestination.TEMP_QUEUE_TYPE: + tempQueueRegion.processDispatchNotification(messageDispatchNotification); + break; + case ActiveMQDestination.TEMP_TOPIC_TYPE: + tempTopicRegion.processDispatchNotification(messageDispatchNotification); + break; + default: + throwUnknownDestinationType(destination); + } + } + + public boolean isSlaveBroker(){ + return brokerService.isSlave(); + } + + } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Wed Jan 18 11:16:58 2006 @@ -22,6 +22,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.filter.MessageEvaluationContext; /** @@ -86,5 +87,16 @@ * reclaim memory. */ void gc(); + + /** + * Used by a Slave Broker to update dispatch infomation + * @param mdn + */ + void processMessageDispatchNotification(MessageDispatchNotification mdn); + + /** + * @return true if the broker is currently in slave mode + */ + boolean isSlaveBroker(); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Wed Jan 18 11:16:58 2006 @@ -19,6 +19,7 @@ import javax.jms.InvalidSelectorException; import javax.jms.JMSException; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTempDestination; @@ -32,8 +33,8 @@ */ public class TempQueueRegion extends AbstractRegion { - public TempQueueRegion(DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) { - super(destinationStatistics, memoryManager, taskRunnerFactory, null); + public TempQueueRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) { + super(broker,destinationStatistics, memoryManager, taskRunnerFactory, null); setAutoCreateDestinations(false); } @@ -55,9 +56,9 @@ protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { if( info.isBrowser() ) { - return new QueueBrowserSubscription(context, info); + return new QueueBrowserSubscription(broker,context, info); } else { - return new QueueSubscription(context, info); + return new QueueSubscription(broker,context, info); } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java Wed Jan 18 11:16:58 2006 @@ -18,6 +18,7 @@ import javax.jms.JMSException; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTempDestination; @@ -31,8 +32,8 @@ */ public class TempTopicRegion extends AbstractRegion { - public TempTopicRegion(DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) { - super(destinationStatistics, memoryManager, taskRunnerFactory, null); + public TempTopicRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) { + super(broker,destinationStatistics, memoryManager, taskRunnerFactory, null); setAutoCreateDestinations(false); } @@ -56,7 +57,7 @@ if( info.isDurable() ) { throw new JMSException("A durable subscription cannot be created for a temporary topic."); } else { - return new TopicSubscription(context, info, this.memoryManager); + return new TopicSubscription(broker,context, info, this.memoryManager); } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Wed Jan 18 11:16:58 2006 @@ -18,6 +18,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -50,9 +51,9 @@ protected final ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap(); private final PolicyMap policyMap; - public TopicRegion(DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, + public TopicRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter, PolicyMap policyMap) { - super(destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter); + super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter); this.policyMap = policyMap; } @@ -168,7 +169,7 @@ SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName()); DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); if (sub == null) { - sub = new DurableTopicSubscription(context, info); + sub = new DurableTopicSubscription(broker,context, info); durableSubscriptions.put(key, sub); } else { @@ -177,14 +178,14 @@ return sub; } else { - return new TopicSubscription(context, info, memoryManager); + return new TopicSubscription(broker,context, info, memoryManager); } } public Subscription createDurableSubscription(SubscriptionInfo info) throws JMSException { SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubcriptionName()); DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); - sub = new DurableTopicSubscription(info); + sub = new DurableTopicSubscription(broker,info); durableSubscriptions.put(key, sub); return sub; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Wed Jan 18 11:16:58 2006 @@ -23,6 +23,7 @@ import javax.jms.InvalidSelectorException; import javax.jms.JMSException; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; @@ -30,6 +31,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.transaction.Synchronization; @@ -41,18 +43,34 @@ protected int dispatched=0; protected int delivered=0; - public TopicSubscription(ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws InvalidSelectorException { - super(context, info); + public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws InvalidSelectorException { + super(broker,context, info); this.usageManager=usageManager; } public void add(MessageReference node) throws InterruptedException, IOException { node.incrementReferenceCount(); - if( !isFull() ) { + if( !isFull() && !isSlaveBroker() ) { dispatch(node); } else { + synchronized(matched){ matched.addLast(node); + } } + } + + public void processMessageDispatchNotification(MessageDispatchNotification mdn){ + synchronized(matched){ + for (Iterator i = matched.iterator(); i.hasNext();){ + MessageReference node = (MessageReference)i.next(); + if (node.getMessageId().equals(mdn.getMessageId())){ + i.remove(); + dispatched++; + node.decrementReferenceCount(); + break; + } + } + } } public void acknowledge(final ConnectionContext context, final MessageAck ack) throws Throwable { Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java Wed Jan 18 11:16:58 2006 @@ -60,6 +60,10 @@ public boolean isMessageAck() { return false; } + + public boolean isMessageDispatchNotification(){ + return false; + } /** * @openwire:property version=1 Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java Wed Jan 18 11:16:58 2006 @@ -42,6 +42,7 @@ boolean isWireFormatInfo(); boolean isMessage(); boolean isMessageAck(); + boolean isMessageDispatchNotification(); Response visit( CommandVisitor visitor) throws Throwable; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java Wed Jan 18 11:16:58 2006 @@ -112,6 +112,14 @@ /////////////////////////////////////////////////// // + // Broker to Broker command objects + // + /////////////////////////////////////////////////// + + byte MESSAGE_DISPATCH_NOTIFICATION = 90; + + /////////////////////////////////////////////////// + // // Data structures contained in the command objects. // /////////////////////////////////////////////////// @@ -129,6 +137,8 @@ byte CONSUMER_ID = 122; byte PRODUCER_ID = 123; byte BROKER_ID = 124; + + } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java Wed Jan 18 11:16:58 2006 @@ -76,4 +76,8 @@ return false; } + public boolean isMessageDispatchNotification(){ + return false; + } + } Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java?rev=370223&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java Wed Jan 18 11:16:58 2006 @@ -0,0 +1,90 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.command; + +import org.apache.activemq.state.CommandVisitor; + +/** + * + * @openwire:marshaller code="90" + * @version $Revision$ + */ +public class MessageDispatchNotification extends BaseCommand{ + + public static final byte DATA_STRUCTURE_TYPE=CommandTypes.MESSAGE_DISPATCH_NOTIFICATION; + + protected ConsumerId consumerId; + protected ActiveMQDestination destination; + protected MessageId messageId; + protected long deliverySequenceId; + + public byte getDataStructureType(){ + return DATA_STRUCTURE_TYPE; + } + + public boolean isMessageDispatchNotification(){ + return true; + } + + /** + * @openwire:property version=1 cache=true + */ + public ConsumerId getConsumerId(){ + return consumerId; + } + public void setConsumerId(ConsumerId consumerId){ + this.consumerId=consumerId; + } + + /** + * @openwire:property version=1 cache=true + */ + public ActiveMQDestination getDestination(){ + return destination; + } + public void setDestination(ActiveMQDestination destination){ + this.destination=destination; + } + + /** + * @openwire:property version=1 + */ + + public long getDeliverySequenceId(){ + return deliverySequenceId; + } + public void setDeliverySequenceId(long deliverySequenceId){ + this.deliverySequenceId=deliverySequenceId; + } + + public Response visit(CommandVisitor visitor) throws Throwable { + return visitor.processMessageDispatchNotification( this ); + } + + + /** + * @openwire:property version=1 + */ + public MessageId getMessageId(){ + return messageId; + } + + public void setMessageId(MessageId messageId){ + this.messageId=messageId; + } + +} Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java ------------------------------------------------------------------------------ svn:executable = * Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java Wed Jan 18 11:16:58 2006 @@ -169,4 +169,8 @@ return false; } + public boolean isMessageDispatchNotification(){ + return false; + } + } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java Wed Jan 18 11:16:58 2006 @@ -58,8 +58,8 @@ add(new ConnectionErrorMarshaller()); add(new ActiveMQObjectMessageMarshaller()); add(new ConsumerInfoMarshaller()); - add(new ConnectionIdMarshaller()); add(new ActiveMQTempTopicMarshaller()); + add(new ConnectionIdMarshaller()); add(new DiscoveryEventMarshaller()); add(new ConnectionInfoMarshaller()); add(new KeepAliveInfoMarshaller()); @@ -74,6 +74,7 @@ add(new ProducerInfoMarshaller()); add(new SubscriptionInfoMarshaller()); add(new ActiveMQMapMessageMarshaller()); + add(new MessageDispatchNotificationMarshaller()); add(new SessionInfoMarshaller()); add(new ActiveMQMessageMarshaller()); add(new TransactionInfoMarshaller()); Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MessageDispatchNotificationMarshaller.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MessageDispatchNotificationMarshaller.java?rev=370223&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MessageDispatchNotificationMarshaller.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MessageDispatchNotificationMarshaller.java Wed Jan 18 11:16:58 2006 @@ -0,0 +1,108 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.openwire.v1; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.activemq.openwire.*; +import org.apache.activemq.command.*; + + +/** + * Marshalling code for Open Wire Format for MessageDispatchNotification + * + * + * NOTE!: This file is auto generated - do not modify! + * if you need to make a change, please see the modify the groovy scripts in the + * under src/gram/script and then use maven openwire:generate to regenerate + * this file. + * + * @version $Revision$ + */ +public class MessageDispatchNotificationMarshaller extends BaseCommandMarshaller { + + /** + * Return the type of Data Structure we marshal + * @return short representation of the type data structure + */ + public byte getDataStructureType() { + return MessageDispatchNotification.DATA_STRUCTURE_TYPE; + } + + /** + * @return a new object instance + */ + public DataStructure createObject() { + return new MessageDispatchNotification(); + } + + /** + * Un-marshal an object instance from the data input stream + * + * @param o the object to un-marshal + * @param dataIn the data input stream to build the object from + * @throws IOException + */ + public void unmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn, BooleanStream bs) throws IOException { + super.unmarshal(wireFormat, o, dataIn, bs); + + MessageDispatchNotification info = (MessageDispatchNotification)o; + info.setConsumerId((org.apache.activemq.command.ConsumerId) unmarsalCachedObject(wireFormat, dataIn, bs)); + info.setDestination((org.apache.activemq.command.ActiveMQDestination) unmarsalCachedObject(wireFormat, dataIn, bs)); + info.setDeliverySequenceId(unmarshalLong(wireFormat, dataIn, bs)); + info.setMessageId((org.apache.activemq.command.MessageId) unmarsalNestedObject(wireFormat, dataIn, bs)); + + } + + + /** + * Write the booleans that this object uses to a BooleanStream + */ + public int marshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException { + + MessageDispatchNotification info = (MessageDispatchNotification)o; + + int rc = super.marshal1(wireFormat, o, bs); + rc += marshal1CachedObject(wireFormat, info.getConsumerId(), bs); + rc += marshal1CachedObject(wireFormat, info.getDestination(), bs); + rc+=marshal1Long(wireFormat, info.getDeliverySequenceId(), bs); + rc += marshal1NestedObject(wireFormat, info.getMessageId(), bs); + + return rc+0; + } + + /** + * Write a object instance to data output stream + * + * @param o the instance to be marshaled + * @param dataOut the output stream + * @throws IOException thrown if an error occurs + */ + public void marshal2(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut, BooleanStream bs) throws IOException { + super.marshal2(wireFormat, o, dataOut, bs); + + MessageDispatchNotification info = (MessageDispatchNotification)o; + marshal2CachedObject(wireFormat, info.getConsumerId(), dataOut, bs); + marshal2CachedObject(wireFormat, info.getDestination(), dataOut, bs); + marshal2Long(wireFormat, info.getDeliverySequenceId(), dataOut, bs); + marshal2NestedObject(wireFormat, info.getMessageId(), dataOut, bs); + + } +} Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java Wed Jan 18 11:16:58 2006 @@ -26,6 +26,7 @@ import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; @@ -70,6 +71,7 @@ Response processRecoverTransactions(TransactionInfo info) throws Throwable; Response processForgetTransaction(TransactionInfo info) throws Throwable; Response processEndTransaction(TransactionInfo info) throws Throwable; + Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Throwable; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Wed Jan 18 11:16:58 2006 @@ -30,6 +30,7 @@ import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; @@ -270,6 +271,10 @@ } public Response processFlush(FlushCommand command) throws Throwable { + return null; + } + + public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Throwable{ return null; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=370223&r1=370222&r2=370223&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Jan 18 11:16:58 2006 @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.Random; +import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; import org.apache.activemq.command.Response; import org.apache.activemq.state.ConnectionStateTracker; @@ -69,6 +70,8 @@ private long maxReconnectDelay = 1000 * 30; private long backOffMultiplier = 2; private boolean useExponentialBackOff = true; + private boolean randomize = true; + private boolean initialized; private int maxReconnectAttempts; private int connectFailures; private long reconnectDelay = initialReconnectDelay; @@ -79,6 +82,20 @@ if (command.isResponse()) { requestMap.remove(new Short(((Response) command).getCorrelationId())); } + if (!initialized){ + if (command.isBrokerInfo()){ + BrokerInfo info = (BrokerInfo)command; + BrokerInfo[] peers = info.getPeerBrokerInfos(); + if (peers!= null){ + for (int i =0; i < peers.length;i++){ + String brokerString = peers[i].getBrokerURL(); + add(brokerString); + } + } + initialized = true; + } + + } transportListener.onCommand(command); } @@ -173,6 +190,7 @@ synchronized (reconnectMutex) { log.debug("Transport failed, starting up reconnect task", e); if (connectedTransport != null) { + initialized = false; ServiceSupport.dispose(connectedTransport); connectedTransport = null; connectedTransportURI = null; @@ -256,6 +274,20 @@ this.maxReconnectAttempts = maxReconnectAttempts; } + /** + * @return Returns the randomize. + */ + public boolean isRandomize(){ + return randomize; + } + + /** + * @param randomize The randomize to set. + */ + public void setRandomize(boolean randomize){ + this.randomize=randomize; + } + public void oneway(Command command) throws IOException { Exception error = null; try { @@ -335,6 +367,19 @@ } reconnect(); } + + public void add(String u){ + try { + URI uri = new URI(u); + if (!uris.contains(uri)) + uris.add(uri); + + reconnect(); + }catch(Exception e){ + log.error("Failed to parse URI: " + u); + } + } + public void reconnect() { log.debug("Waking up reconnect task"); @@ -345,17 +390,18 @@ } } - private ArrayList getConnectList() { - ArrayList l = new ArrayList(uris); - - // Randomly, reorder the list by random swapping - Random r = new Random(); - r.setSeed(System.currentTimeMillis()); - for (int i = 0; i < l.size(); i++) { - int p = r.nextInt(l.size()); - Object t = l.get(p); - l.set(p, l.get(i)); - l.set(i, t); + private ArrayList getConnectList(){ + ArrayList l=new ArrayList(uris); + if (randomize){ + // Randomly, reorder the list by random swapping + Random r=new Random(); + r.setSeed(System.currentTimeMillis()); + for (int i=0;i<l.size();i++){ + int p=r.nextInt(l.size()); + Object t=l.get(p); + l.set(p,l.get(i)); + l.set(i,t); + } } return l; } Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java?rev=370223&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java (added) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java Wed Jan 18 11:16:58 2006 @@ -0,0 +1,97 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import java.net.URI; + +import javax.jms.*; +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.xbean.BrokerFactoryBean; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.core.io.ClassPathResource; + +public class FTBrokerTest extends TestCase { + + protected static final int MESSAGE_COUNT = 10; + protected BrokerService master; + protected BrokerService slave; + protected Connection connection; + protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false"; + //protected String uriString = "tcp://localhost:62001"; + + protected void setUp() throws Exception { + BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/master.xml")); + brokerFactory.afterPropertiesSet(); + master = brokerFactory.getBroker(); + brokerFactory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/slave.xml")); + brokerFactory.afterPropertiesSet(); + slave = brokerFactory.getBroker(); + //uriString = "failover://(" + master.getVmConnectorURI() + "," + slave.getVmConnectorURI() + ")?randomize=false"; + //uriString = "failover://(" + master.getVmConnectorURI() + "," + slave.getVmConnectorURI() + ")"; + System.out.println("URI = " + uriString); + URI uri = new URI(uriString); + ConnectionFactory fac = new ActiveMQConnectionFactory(uri); + connection = fac.createConnection(); + master.start(); + slave.start(); + //wait for thing to connect + Thread.sleep(1000); + connection.start(); + super.setUp(); + + + + } + + + + + protected void tearDown() throws Exception { + try { + connection.close(); + slave.stop(); + master.stop(); + }catch(Throwable e){ + e.printStackTrace(); + } + + super.tearDown(); + } + + public void testFTBroker() throws Exception{ + + Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getClass().toString()); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < MESSAGE_COUNT; i++){ + Message msg = session.createTextMessage("test: " + i); + producer.send(msg); + } + master.stop(); + session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < MESSAGE_COUNT; i++){ + System.out.println("GOT MSG: " + consumer.receive(1000)); + } + + } + +} Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java ------------------------------------------------------------------------------ svn:executable = * Added: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml?rev=370223&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml (added) +++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml Wed Jan 18 11:16:58 2006 @@ -0,0 +1,29 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Copyright 2005-2006 The Apache Software Foundation + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://activemq.org/config/1.0"> + + <broker brokerName="master" persistent="false" useJmx="false"> + <transportConnectors> + <transportConnector uri="tcp://localhost:62001"/> + </transportConnectors> + + <persistenceAdapter> + <memoryPersistenceAdapter/> + </persistenceAdapter> + </broker> + +</beans> Propchange: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml ------------------------------------------------------------------------------ svn:executable = * Added: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml?rev=370223&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml (added) +++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml Wed Jan 18 11:16:58 2006 @@ -0,0 +1,31 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Copyright 2005-2006 The Apache Software Foundation + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://activemq.org/config/1.0"> + + <broker brokerName="slave" persistent="false" useJmx="false" masterConnectorURI="tcp://localhost:62001"> + <transportConnectors> + <transportConnector uri="tcp://localhost:62002"/> + </transportConnectors> + + + + <persistenceAdapter> + <memoryPersistenceAdapter/> + </persistenceAdapter> + </broker> + +</beans> Propchange: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml ------------------------------------------------------------------------------ svn:executable = *
