Author: rajdavies
Date: Thu Jul 19 12:24:31 2007
New Revision: 557748

URL: http://svn.apache.org/viewvc?view=rev&rev=557748
Log:
move decision about being a slave from the Broker to the ConnectionContext - so 
can be done on a Connection basis if required

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
 Thu Jul 19 12:24:31 2007
@@ -191,12 +191,7 @@
      * @param messageDispatch
      */
     public void processDispatch(MessageDispatch messageDispatch);
-    
-    /**
-     * @return true if the broker is running as a slave
-     */
-    public boolean isSlaveBroker();
-    
+  
     /**
      * @return true if the broker has stopped
      */
@@ -229,7 +224,7 @@
      * @return true if fault tolerant
      */
     public boolean isFaultTolerantConfiguration();
-
+    
     /**
      * @return the connection context used to make administration operations 
on startup or via JMX MBeans
      */

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
 Thu Jul 19 12:24:31 2007
@@ -200,11 +200,7 @@
     public void processDispatchNotification(MessageDispatchNotification 
messageDispatchNotification) throws Exception{
         next.processDispatchNotification(messageDispatchNotification);
     }
-    
-    public boolean isSlaveBroker(){
-        return next.isSlaveBroker();
-    }
-    
+        
     public boolean isStopped(){
         return next.isStopped();
     }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
 Thu Jul 19 12:24:31 2007
@@ -154,6 +154,7 @@
     private boolean useLocalHostBrokerName = false;
     private CountDownLatch stoppedLatch = new CountDownLatch(1);
     private boolean supportFailOver = false;
+    private boolean clustered = false;
 
     static{
         String localHostName = "localhost";
@@ -1120,6 +1121,20 @@
     public void setSupportFailOver(boolean supportFailOver){
         this.supportFailOver=supportFailOver;
     }    
+    
+    /**
+     * @return the clustered
+     */
+    public boolean isClustered(){
+        return this.clustered;
+    }
+
+    /**
+     * @param clustered the clustered to set
+     */
+    public void setClustered(boolean clustered){
+        this.clustered=clustered;
+    }
 
     // Implementation methods
     // 
-------------------------------------------------------------------------
@@ -1697,6 +1712,5 @@
                 broker.addDestination(adminConnectionContext, destination);
             }
         }
-    }
-   
+    }   
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
 Thu Jul 19 12:24:31 2007
@@ -59,6 +59,7 @@
     private final AtomicBoolean stopping = new AtomicBoolean();
     private final MessageEvaluationContext messageEvaluationContext = new 
MessageEvaluationContext();
        private boolean dontSendReponse;
+    private boolean clientMaster=true;
     
     public ConnectionContext() {
     }
@@ -267,6 +268,29 @@
 
        public boolean isDontSendReponse() {
                return dontSendReponse;
-       }       
-       
+       }
+
+    
+    /**
+     * @return the slave
+     */
+    public boolean isSlave(){
+        return 
(this.broker!=null&&this.broker.getBrokerService().isSlave())||!this.clientMaster;
+    }
+
+    
+    /**
+     * @return the clientMaster
+     */
+    public boolean isClientMaster(){
+        return this.clientMaster;
+    }
+
+    
+    /**
+     * @param clientMaster the clientMaster to set
+     */
+    public void setClientMaster(boolean clientMaster){
+        this.clientMaster=clientMaster;
+    }  
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
 Thu Jul 19 12:24:31 2007
@@ -199,10 +199,7 @@
 
     }
 
-    public boolean isSlaveBroker() {
-        return false;
-    }
-
+    
     public boolean isStopped() {
         return false;
     }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
 Thu Jul 19 12:24:31 2007
@@ -197,10 +197,7 @@
         throw new BrokerStoppedException(this.message);
     }
 
-    public boolean isSlaveBroker() {
-        throw new BrokerStoppedException(this.message);
-    }
-
+   
     public boolean isStopped() {
         return true;
     }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
 Thu Jul 19 12:24:31 2007
@@ -209,10 +209,7 @@
         getNext().processDispatchNotification(messageDispatchNotification);
     }
     
-    public boolean isSlaveBroker(){
-        return getNext().isSlaveBroker();
-    }
-    
+       
     public boolean isStopped(){
         return getNext().isStopped();
     }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 Thu Jul 19 12:24:31 2007
@@ -658,6 +658,7 @@
         context.setClientId(clientId);
         context.setUserName(info.getUserName());
         context.setConnectionId(info.getConnectionId());
+        context.setClientMaster(info.isClientMaster());
         context.setWireFormatInfo(wireFormatInfo);
         context.setNetworkConnection(networkConnection);
         context.incrementReference();
@@ -1199,18 +1200,19 @@
         }
     }
     
-       protected void disposeTransport() {
-       if( transportDisposed.compareAndSet(false, true) ) {
-        try {
-                       transport.stop();
-                       active = false;
-                       log.debug("Stopped connection: 
"+transport.getRemoteAddress());
-               } catch (Exception e) {
-                       log.debug("Could not stop transport: "+e,e);
-               }
-       }
-       }
-       
+       protected void disposeTransport(){
+        if(transportDisposed.compareAndSet(false,true)){
+            try{
+                transport.stop();
+                active=false;
+                log.debug("Stopped connection: "+transport.getRemoteAddress());
+            }catch(Exception e){
+                log.debug("Could not stop transport: "+e,e);
+            }
+        }
+    }
+    
+       
        public int getProtocolVersion() {
                return protocolVersion.get();
        }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
 Thu Jul 19 12:24:31 2007
@@ -115,8 +115,8 @@
     public void gc() {        
     }
     
-    public boolean isSlaveBroker(){
-        return broker.isSlaveBroker();
+    public boolean isSlave(){
+        return getContext().isSlave();
     }
 
     public ConnectionContext getContext() {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 Thu Jul 19 12:24:31 2007
@@ -74,7 +74,7 @@
         // The slave should not deliver pull messages. TODO: when the slave 
becomes a master,
         // He should send a NULL message to all the consumers to 'wake them 
up' in case
         // they were waiting for a message.
-        if(getPrefetchSize()==0&&!isSlaveBroker()){
+        if(getPrefetchSize()==0&&!isSlave()){
             prefetchExtension++;
             final long dispatchCounterBeforePull=dispatchCounter;
             dispatchMatched();
@@ -119,7 +119,7 @@
         pendingEmpty=pending.isEmpty();
         enqueueCounter++;
        
-        if(!isFull()&&pendingEmpty&&!broker.isSlaveBroker()){
+        if(!isFull()&&pendingEmpty&&!isSlave()){
             dispatch(node);
         }else{
             optimizePrefetch();
@@ -260,7 +260,7 @@
         if(callDispatchMatched){
             dispatchMatched();
         }else{
-            if(isSlaveBroker()){
+            if(isSlave()){
                 throw new JMSException("Slave broker out of sync with master: 
Acknowledgment ("+ack
                         +") was not in the dispatch list: "+dispatched);
             }else{
@@ -295,7 +295,7 @@
      * @return
      */
     protected synchronized boolean isFull(){
-        return 
isSlaveBroker()||dispatched.size()-prefetchExtension>=info.getPrefetchSize();
+        return 
isSlave()||dispatched.size()-prefetchExtension>=info.getPrefetchSize();
     }
 
     /**
@@ -377,7 +377,7 @@
     }
 
     protected synchronized void dispatchMatched() throws IOException{
-        if(!broker.isSlaveBroker()){
+        if(!isSlave()){
             try{
                 int numberToDispatch=countBeforeFull();
                 if(numberToDispatch>0){
@@ -412,7 +412,7 @@
             return false;
         }
         // Make sure we can dispatch a message.
-        if(canDispatch(node)&&!isSlaveBroker()){
+        if(canDispatch(node)&&!isSlave()){
             MessageDispatch md=createMessageDispatch(node,message);
             // NULL messages don't count... they don't get Acked.
             if(node!=QueueMessageReference.NULL_MESSAGE){

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
 Thu Jul 19 12:24:31 2007
@@ -111,7 +111,7 @@
     /**
      * @return true if the broker is currently in slave mode
      */
-    boolean isSlaveBroker();
+    boolean isSlave();
     
     /**
      * @return number of messages pending delivery

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 Thu Jul 19 12:24:31 2007
@@ -74,7 +74,7 @@
     public void add(MessageReference node) throws Exception{
         enqueueCounter.incrementAndGet();
         node.incrementReferenceCount();
-        if(!isFull()&&!isSlaveBroker()){
+        if(!isFull()&&!isSlave()){
             optimizePrefetch();
             // if maximumPendingMessages is set we will only discard messages 
which
             // have not been dispatched (i.e. we allow the prefetch buffer to 
be filled)

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
 Thu Jul 19 12:24:31 2007
@@ -36,7 +36,7 @@
     protected BrokerId[] brokerPath;
     protected boolean brokerMasterConnector;
     protected boolean manageable;
-    protected boolean clientMaster;
+    protected boolean clientMaster=true;
     protected transient Object transportContext; 
     
     public ConnectionInfo() {        


Reply via email to