Author: rajdavies
Date: Tue Sep  2 04:48:21 2008
New Revision: 691206

URL: http://svn.apache.org/viewvc?rev=691206&view=rev
Log:
Apply patch for https://issues.apache.org/activemq/browse/AMQ-596

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/resources/activemq.xsd

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
 Tue Sep  2 04:48:21 2008
@@ -102,7 +102,7 @@
  * @version $Revision: 1.1 $
  */
 public class BrokerService implements Service {
-
+       protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
     public static final String DEFAULT_PORT = "61616";
     public static final String LOCAL_HOST_NAME;
     public static final String DEFAULT_BROKER_NAME = "localhost";
@@ -117,6 +117,8 @@
     private boolean useShutdownHook = true;
     private boolean useLoggingForShutdownErrors;
     private boolean shutdownOnMasterFailure;
+    private boolean shutdownOnSlaveFailure;
+    private boolean waitForSlave;
     private String brokerName = DEFAULT_BROKER_NAME;
     private File dataDirectoryFile;
     private File tmpDataDirectory;
@@ -1820,6 +1822,19 @@
         return context;
     }
 
+    protected void waitForSlave(){
+        try {
+               slaveStartSignal.await();
+        }catch(InterruptedException e){
+               LOG.error("Exception waiting for slave:"+e);
+        }
+    }
+    
+    protected void slaveConnectionEstablished(){
+       slaveStartSignal.countDown();
+    }
+    
+    
     /**
      * Start all transport and network connections, proxies and bridges
      * 
@@ -1847,7 +1862,9 @@
             map.put("network", "true");
             map.put("async", "false");
             uri = URISupport.createURIWithQuery(uri, 
URISupport.createQueryString(map));
-
+            if(isWaitForSlave()){
+               waitForSlave();
+            }
             for (Iterator<NetworkConnector> iter = 
getNetworkConnectors().iterator(); iter.hasNext();) {
                 NetworkConnector connector = iter.next();
                 connector.setLocalUri(uri);
@@ -1984,4 +2001,24 @@
         this.sslContext = sslContext;
     }
 
+       public boolean isShutdownOnSlaveFailure() {
+               return shutdownOnSlaveFailure;
+       }
+
+       public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
+               this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
+       }
+
+       public boolean isWaitForSlave() {
+               return waitForSlave;
+       }
+
+       public void setWaitForSlave(boolean waitForSlave) {
+               this.waitForSlave = waitForSlave;
+       }
+
+       public CountDownLatch getSlaveStartSignal() {
+               return slaveStartSignal;
+       }
+
 }
\ No newline at end of file

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 Tue Sep  2 04:48:21 2008
@@ -87,6 +87,7 @@
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.util.ServiceSupport;
 import org.apache.activemq.util.URISupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -208,6 +209,20 @@
     }
 
     public void serviceTransportException(IOException e) {
+       BrokerService bService=connector.getBrokerService();
+       if(bService.isShutdownOnSlaveFailure()){
+               if(brokerInfo!=null){
+                       if(brokerInfo.isSlaveBroker()){
+                               LOG.error("Slave has exception: " + 
e.getMessage()+" shutting down master now.", e);
+                           try {
+                               broker.stop();
+                               bService.stop();
+                               }catch(Exception ex){
+                               LOG.warn("Failed to stop the master",ex);
+                           }
+                       }
+               }
+       }
         if (!stopping.get()) {
             transportException.set(e);
             if (TRANSPORTLOG.isDebugEnabled()) {
@@ -601,7 +616,11 @@
     }
 
     public Response processAddConnection(ConnectionInfo info) throws Exception 
{
-        
+       //if the broker service has slave attached, wait for the slave to be 
attached to allow client connection. slave connection is fine
+       
if(!info.isBrokerMasterConnector()&&connector.getBrokerService().isWaitForSlave()&&connector.getBrokerService().getSlaveStartSignal().getCount()==1){
+                       ServiceSupport.dispose(transport);
+                       return new ExceptionResponse(new Exception("Master's 
slave not attached yet."));
+       }
         // Older clients should have been defaulting this field to true.. but 
they were not. 
         if( wireFormatInfo!=null && wireFormatInfo.getVersion() <= 2 ) {
             info.setClientMaster(true);
@@ -1129,6 +1148,9 @@
             masterBroker = new MasterBroker(parent, transport);
             masterBroker.startProcessing();
             LOG.info("Slave Broker " + info.getBrokerName() + " is attached");
+            BrokerService bService=connector.getBrokerService();
+            bService.slaveConnectionEstablished();
+            
         } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
             // so this TransportConnection is the rear end of a network bridge
             // We have been requested to create a two way pipe ...

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
 Tue Sep  2 04:48:21 2008
@@ -394,4 +394,8 @@
     public Broker getBroker() {
         return broker;
     }
+
+       public BrokerService getBrokerService() {
+               return brokerService;
+       }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
 Tue Sep  2 04:48:21 2008
@@ -139,7 +139,7 @@
         } catch (Exception e) {
             masterActive.set(false);
             LOG.error("Failed to start network bridge: " + e, e);
-        }    
+        }   
     }
 
     protected void startBridge() throws Exception {
@@ -148,10 +148,8 @@
         connectionInfo.setClientId(idGenerator.generateId());
         connectionInfo.setUserName(userName);
         connectionInfo.setPassword(password);
+        connectionInfo.setBrokerMasterConnector(true);
         localBroker.oneway(connectionInfo);
-        ConnectionInfo remoteInfo = new ConnectionInfo();
-        connectionInfo.copy(remoteInfo);
-        remoteInfo.setBrokerMasterConnector(true);
         remoteBroker.oneway(connectionInfo);
         sessionInfo = new SessionInfo(connectionInfo, 1);
         localBroker.oneway(sessionInfo);

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 Tue Sep  2 04:48:21 2008
@@ -392,7 +392,7 @@
                     try {
 
                         context.getBroker().addDestination(context, 
destination);
-                        // dest = addDestination(context, destination);
+                        dest = addDestination(context, destination);
                     } catch (DestinationAlreadyExistsException e) {
                         // if the destination already exists then lets ignore
                         // this error

Modified: activemq/trunk/activemq-core/src/main/resources/activemq.xsd
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/activemq.xsd?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/activemq.xsd (original)
+++ activemq/trunk/activemq-core/src/main/resources/activemq.xsd Tue Sep  2 
04:48:21 2008
@@ -813,6 +813,8 @@
       <xs:attribute name='producerSystemUsagePortion' type='xs:integer'/>
       <xs:attribute name='regionBroker' type='xs:string'/>
       <xs:attribute name='shutdownOnMasterFailure' type='xs:boolean'/>
+      <xs:attribute name='shutdownOnSlaveFailure' type='xs:boolean'/>
+      <xs:attribute name='waitForSlave' type='xs:boolean'/>
       <xs:attribute name='splitSystemUsageForProducersConsumers' 
type='xs:boolean'/>
       <xs:attribute name='sslContext' type='xs:string'/>
       <xs:attribute name='start' type='xs:boolean'>


Reply via email to