Author: rajikak
Date: Thu Aug  1 22:03:03 2013
New Revision: 1509452

URL: http://svn.apache.org/r1509452
Log:
added initial ha implementations.

Added:
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHABrokerEntry.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHAEntry.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
Modified:
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportBuffers.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactory.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPTransportUtilsTest.java

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportBuffers.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportBuffers.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportBuffers.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportBuffers.java
 Thu Aug  1 22:03:03 2013
@@ -94,7 +94,7 @@ public class AMQPTransportBuffers {
             // block if there is no messages
             return requestBuffer.take();
         } catch (InterruptedException e) {
-            // ignore
+            Thread.currentThread().interrupt();
         }
         return null;
     }

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java
 Thu Aug  1 22:03:03 2013
@@ -164,29 +164,29 @@ public final class AMQPTransportConstant
      * If a polling task encounter an exception due to some reason(most 
probably due to broker
      * outage) the number of milliseconds it should be suspended before next 
re-try.
      */
-    public static final String PARAMETER_INITIAL_RE_CONNECTION_DURATION =
-            "transport.amqp.InitialReconnectDuration";
+    public static final String PARAM_INITIAL_RE_CONNECTION_DURATION =
+            "initial-reconnect-duration";
 
     /**
      * If the polling task fails again after the initial re-connection duration
-     * {@link AMQPTransportConstant#PARAMETER_INITIAL_RE_CONNECTION_DURATION}
+     * {@link AMQPTransportConstant#PARAM_INITIAL_RE_CONNECTION_DURATION}
      * next suspend duration will be calculated using this
-     * (PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR * 
PARAMETER_INITIAL_RE_CONNECTION_DURATION).
+     * (PARAM_RE_CONNECTION_PROGRESSION_FACTOR * 
PARAM_INITIAL_RE_CONNECTION_DURATION).
      */
-    public static final String PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR =
-            "transport.amqp.ReconnectionProgressionFactor";
+    public static final String PARAM_RE_CONNECTION_PROGRESSION_FACTOR =
+            "reconnection-progression-factor";
 
 
     /**
      * The maximum duration to suspend the polling task in case of an error. 
The current suspend
      * duration will reach this
      * value by following the series,
-     * PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR * 
PARAMETER_INITIAL_RE_CONNECTION_DURATION.
+     * PARAM_RE_CONNECTION_PROGRESSION_FACTOR * 
PARAM_INITIAL_RE_CONNECTION_DURATION.
      * This upper bound is there
      * because nobody wants to wait a long time until the next re-try if the 
broker is alive.
      */
-    public static final String PARAMETER_MAX_RE_CONNECTION_DURATION =
-            "transport.amqp.MaximumReconnectionDuration";
+    public static final String PARAM_MAX_RE_CONNECTION_DURATION =
+            "maximum-reconnection-duration";
 
 
     /**

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
 Thu Aug  1 22:03:03 2013
@@ -17,6 +17,7 @@ import org.apache.axis2.AxisFault;
 import org.apache.axis2.transport.base.AbstractTransportListenerEx;
 import 
org.apache.synapse.transport.amqp.connectionfactory.AMQPTransportConnectionFactory;
 import 
org.apache.synapse.transport.amqp.connectionfactory.AMQPTransportConnectionFactoryManager;
+import org.apache.synapse.transport.amqp.ha.AMQPTransportReconnectHandler;
 import org.apache.synapse.transport.amqp.pollingtask.AMQPTransportPollingTask;
 
 import java.util.concurrent.ExecutorService;
@@ -40,6 +41,8 @@ public class AMQPTransportListener exten
 
     private ExecutorService connectionFactoryES;
 
+    private AMQPTransportReconnectHandler haHandler;
+
     @Override
     protected void doInit() throws AxisFault {
 
@@ -58,6 +61,25 @@ public class AMQPTransportListener exten
                 
AMQPTransportUtils.getIntProperty(AMQPTransportConstant.PARAM_WORKER_POOL_SIZE,
                         AMQPTransportConstant.WORKER_POOL_DEFAULT));
 
+
+        int initialReconnectDuration = AMQPTransportUtils.getIntProperty(
+                AMQPTransportConstant.PARAM_INITIAL_RE_CONNECTION_DURATION, 
1000);
+
+        double reconnectionProgressionFactor = 
AMQPTransportUtils.getDoubleProperty(
+                AMQPTransportConstant.PARAM_RE_CONNECTION_PROGRESSION_FACTOR, 
2.0);
+
+        int maxReconnectionDuration = AMQPTransportUtils.getIntProperty(
+                AMQPTransportConstant.PARAM_MAX_RE_CONNECTION_DURATION, 1000 * 
60 * 10);
+
+        haHandler = new AMQPTransportReconnectHandler(
+                connectionFactoryES,
+                maxReconnectionDuration,
+                reconnectionProgressionFactor,
+                initialReconnectDuration,
+                connectionFactoryManager);
+
+        new Thread(haHandler, "AMQP-HA-handler-task").start();
+
         log.info("AMQP transport listener initializing..");
     }
 

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactory.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactory.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactory.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactory.java
 Thu Aug  1 22:03:03 2013
@@ -26,7 +26,10 @@ import org.apache.synapse.transport.amqp
 import org.apache.synapse.transport.amqp.AMQPTransportUtils;
 
 import java.io.IOException;
-import java.util.Hashtable;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
@@ -65,7 +68,7 @@ public class AMQPTransportConnectionFact
     /**
      * The list of parameters(see above) in the connection factory definition.
      */
-    private Hashtable<String, String> parameters = new Hashtable<String, 
String>();
+    private Map<String, String> parameters = new HashMap<String, String>();
 
     /**
      * The AMQP connection to the broker maintain per connection factory.
@@ -77,6 +80,21 @@ public class AMQPTransportConnectionFact
      */
     private Channel channel = null;
 
+    public AMQPTransportConnectionFactory(
+            Map<String, String> parameters,
+            ExecutorService es)
+            throws AMQPTransportException {
+        try {
+            connection = createConnection(es, parameters);
+            channel = createChannel(connection, parameters);
+        } catch (Exception e) {
+            String msg = "Could not initialize the connection factory with 
parameters\n";
+            for (Map.Entry entry : parameters.entrySet()) {
+                msg = msg + entry.getKey() + ":" + entry.getValue() + "\n";
+            }
+            throw new AMQPTransportException(msg, e);
+        }
+    }
 
     public AMQPTransportConnectionFactory(Parameter parameter, ExecutorService 
es)
             throws AMQPTransportException {
@@ -87,7 +105,7 @@ public class AMQPTransportConnectionFact
 
             if (!(parameter.getValue() instanceof OMElement)) {
                 throw new AMQPTransportException("The connection factory '" + 
parameter.getName() +
-                        "' is in valid. It's required to have the least 
connection factory definition with '" +
+                        "' is invalid. It's required to have the least 
connection factory definition with '" +
                         AMQPTransportConstant.PARAMETER_CONNECTION_URI + "' 
parameter. Example: \n" +
                         "\n<transportReceiver name=\"amqp\" 
class=\"org.wso2.carbon.transports.amqp.AMQPTransportListener\">\n" +
                         "   <parameter name=\"default\" locked=\"false\">\n" +
@@ -106,60 +124,8 @@ public class AMQPTransportConnectionFact
                 parameters.put(entry.getName(), (String) entry.getValue());
             }
 
-            ConnectionFactory connectionFactory = new ConnectionFactory();
-            
connectionFactory.setUri(parameters.get(AMQPTransportConstant.PARAMETER_CONNECTION_URI));
-
-            if (parameters.get(AMQPTransportConstant.PARAMETER_BROKER_LIST) != 
null) {
-                Address[] addresses = AMQPTransportUtils.getAddressArray(
-                        
parameters.get(AMQPTransportConstant.PARAMETER_BROKER_LIST), ",", ':');
-                connection = connectionFactory.newConnection(es, addresses);
-            } else {
-                connection = connectionFactory.newConnection(es);
-            }
-
-            if 
(parameters.get(AMQPTransportConstant.PARAMETER_AMQP_CHANNEL_NUMBER) != null) {
-                int index = 0;
-                try {
-                    index = Integer.parseInt(parameters.get(
-                            
AMQPTransportConstant.PARAMETER_AMQP_CHANNEL_NUMBER));
-                } catch (NumberFormatException e) {
-                    index = 1; // assume default,
-                    // fair dispatch see 
http://www.rabbitmq.com/tutorials/tutorial-two-java.html
-                }
-                channel = connection.createChannel(index);
-
-            } else {
-                channel = connection.createChannel();
-            }
-
-
-            int prefetchSize = 1024;
-            if 
(parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_SIZE) != null) 
{
-                try {
-                    prefetchSize = Integer.parseInt(
-                            
parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_SIZE));
-                } catch (NumberFormatException e) {
-                    prefetchSize = 1024; // assume default
-                }
-            }
-
-            int prefetchCount = 0;
-            if 
(parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_COUNT) != 
null) {
-                try {
-                    prefetchCount = Integer.parseInt(
-                            
parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_COUNT));
-                    channel.basicQos(prefetchCount);
-                } catch (NumberFormatException e) {
-                    prefetchCount = 0; // assume default
-                }
-            }
-
-            boolean useGlobally = false;
-            if 
(parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_QOS_GLOBAL) != null) {
-                useGlobally = Boolean.parseBoolean(parameters.get(
-                        AMQPTransportConstant.PARAMETER_CHANNEL_QOS_GLOBAL));
-            }
-
+            connection = createConnection(es, parameters);
+            channel = createChannel(connection, parameters);
 
         } catch (Exception e) {
             throw new AMQPTransportException("" +
@@ -199,6 +165,14 @@ public class AMQPTransportConnectionFact
     }
 
     /**
+     * Get the connection
+     * @return the connection to broker.
+     */
+    public Connection getConnection() {
+        return connection;
+    }
+
+    /**
      * Return the name of this connection factory(the name given in axis2.xml)
      *
      * @return name of this connection factory
@@ -225,4 +199,64 @@ public class AMQPTransportConnectionFact
     public Map<String, String> getParameters() {
         return parameters;
     }
-}
+
+    private Connection createConnection(ExecutorService es, Map<String, 
String> parameters)
+            throws IOException, URISyntaxException, NoSuchAlgorithmException, 
KeyManagementException {
+        ConnectionFactory connectionFactory = new ConnectionFactory();
+        
connectionFactory.setUri(parameters.get(AMQPTransportConstant.PARAMETER_CONNECTION_URI));
+
+        if (parameters.get(AMQPTransportConstant.PARAMETER_BROKER_LIST) != 
null) {
+            Address[] addresses = AMQPTransportUtils.getAddressArray(
+                    
parameters.get(AMQPTransportConstant.PARAMETER_BROKER_LIST), ",", ':');
+            return connectionFactory.newConnection(es, addresses);
+        }
+        return connectionFactory.newConnection(es);
+    }
+
+    private Channel createChannel(Connection connection, Map<String, String> 
parameters)
+            throws IOException {
+        Channel ch;
+        if 
(parameters.get(AMQPTransportConstant.PARAMETER_AMQP_CHANNEL_NUMBER) != null) {
+            int index = 0;
+            try {
+                index = Integer.parseInt(parameters.get(
+                        AMQPTransportConstant.PARAMETER_AMQP_CHANNEL_NUMBER));
+            } catch (NumberFormatException e) {
+                index = 1; // assume default,
+                // fair dispatch see 
http://www.rabbitmq.com/tutorials/tutorial-two-java.html
+            }
+            ch = connection.createChannel(index);
+
+        } else {
+            ch = connection.createChannel();
+        }
+
+        int prefetchSize = 1024;
+        if 
(parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_SIZE) != null) 
{
+            try {
+                prefetchSize = Integer.parseInt(
+                        
parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_SIZE));
+            } catch (NumberFormatException e) {
+                prefetchSize = 1024; // assume default
+            }
+        }
+
+        int prefetchCount = 0;
+        if 
(parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_COUNT) != 
null) {
+            try {
+                prefetchCount = Integer.parseInt(
+                        
parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_COUNT));
+                ch.basicQos(prefetchCount);
+            } catch (NumberFormatException e) {
+                prefetchCount = 0; // assume default
+            }
+        }
+
+        boolean useGlobally = false;
+        if (parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_QOS_GLOBAL) 
!= null) {
+            useGlobally = Boolean.parseBoolean(parameters.get(
+                    AMQPTransportConstant.PARAMETER_CHANNEL_QOS_GLOBAL));
+        }
+        return ch;
+    }
+}
\ No newline at end of file

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
 Thu Aug  1 22:03:03 2013
@@ -41,8 +41,8 @@ public class AMQPTransportConnectionFact
      * Add the list of defined connection factories definition.
      *
      * @param transportInDescription The connection factory definition in 
axis2.xml
-     * @param es An instance of java.util.concurrent.ExecutorService to use 
with AMQP connection
-     *           factory
+     * @param es                     An instance of 
java.util.concurrent.ExecutorService to use with AMQP connection
+     *                               factory
      */
     public void addConnectionFactories(ParameterInclude 
transportInDescription, ExecutorService es) {
         for (Parameter p : transportInDescription.getParameters()) {
@@ -63,6 +63,10 @@ public class AMQPTransportConnectionFact
         factories.put(parameter.getName(), new 
AMQPTransportConnectionFactory(parameter, es));
     }
 
+    public void addConnectionFactory(String name, 
AMQPTransportConnectionFactory cf) {
+        factories.put(name, cf);
+    }
+
     /**
      * Get the connection factory with this name.
      *
@@ -112,4 +116,9 @@ public class AMQPTransportConnectionFact
             throw new AMQPTransportException("Error occurred whiling shutting 
down connections", e);
         }
     }
+
+    public ConcurrentHashMap<String, AMQPTransportConnectionFactory> 
getAllFactories() {
+        return factories;
+    }
+
 }

Added: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHABrokerEntry.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHABrokerEntry.java?rev=1509452&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHABrokerEntry.java
 (added)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHABrokerEntry.java
 Thu Aug  1 22:03:03 2013
@@ -0,0 +1,51 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.synapse.transport.amqp.ha;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+
+public class AMQPTransportHABrokerEntry {
+
+    private Channel channel;
+
+    private Connection connection;
+
+    public AMQPTransportHABrokerEntry(Channel channel, Connection connection) {
+        this.channel = channel;
+        this.connection = connection;
+    }
+
+
+    public Channel getChannel() {
+        return channel;
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+}

Added: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHAEntry.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHAEntry.java?rev=1509452&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHAEntry.java
 (added)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHAEntry.java
 Thu Aug  1 22:03:03 2013
@@ -0,0 +1,56 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.synapse.transport.amqp.ha;
+
+import java.util.concurrent.Semaphore;
+
+public class AMQPTransportHAEntry {
+
+    private Semaphore lock;
+
+    private String key;
+
+    private String connFacName;
+
+    public AMQPTransportHAEntry(Semaphore lock, String key, String 
connFacName) {
+        this.lock = lock;
+        this.key = key;
+        this.connFacName = connFacName;
+    }
+
+    public Semaphore getLock() {
+        return lock;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public String getConnectionFactoryName() {
+        return connFacName;
+    }
+}

Added: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java?rev=1509452&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
 (added)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
 Thu Aug  1 22:03:03 2013
@@ -0,0 +1,148 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.synapse.transport.amqp.ha;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.transport.amqp.AMQPTransportException;
+import 
org.apache.synapse.transport.amqp.connectionfactory.AMQPTransportConnectionFactory;
+import 
org.apache.synapse.transport.amqp.connectionfactory.AMQPTransportConnectionFactoryManager;
+
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * Responsible for handling the shutdown signals gracefully. For example
+ * this provides the functionality for reconnecting to broker if broker
+ * when offline. The reconnection attempts happens in exponential back-off
+ * fashion.
+ */
+public class AMQPTransportReconnectHandler implements Runnable {
+
+    private BlockingQueue<AMQPTransportHAEntry> blockedTasks =
+            new LinkedBlockingQueue<AMQPTransportHAEntry>();
+
+    private ConcurrentMap<String, AMQPTransportHABrokerEntry> connectionMap =
+            new ConcurrentHashMap<String, AMQPTransportHABrokerEntry>();
+
+    private AMQPTransportConnectionFactoryManager connectionFactoryManager;
+
+    private int initialReconnectDuration = 1000;
+
+    private double reconnectionProgressionFactor = 2.0;
+
+    private int maxReconnectionDuration = 1000 * 60 * 10;
+
+    private ExecutorService es;
+
+    public AMQPTransportReconnectHandler(ExecutorService es,
+                                         int maxReconnectionDuration,
+                                         double reconnectionProgressionFactor,
+                                         int initialReconnectDuration,
+                                         AMQPTransportConnectionFactoryManager
+                                                 connectionFactoryManager) {
+        this.es = es;
+        this.maxReconnectionDuration = maxReconnectionDuration;
+        this.reconnectionProgressionFactor = reconnectionProgressionFactor;
+        this.initialReconnectDuration = initialReconnectDuration;
+        this.connectionFactoryManager = connectionFactoryManager;
+    }
+
+    private static Log log = 
LogFactory.getLog(AMQPTransportReconnectHandler.class);
+
+    public void run() {
+        try {
+            AMQPTransportHAEntry entry = blockedTasks.take();
+            if (entry != null) {
+                Map<String, String> params = connectionFactoryManager.
+                        
getConnectionFactory(entry.getConnectionFactoryName()).getParameters();
+                int count = 1;
+                long retryDuration = initialReconnectDuration;
+
+                while (true) {
+                    try {
+                        Thread.sleep(initialReconnectDuration);
+                        new AMQPTransportConnectionFactory(params, es);
+                        log.info("The reconnection attempt '" + count + "' was 
successful");
+                        break;
+                    } catch (AMQPTransportException e) {
+                        retryDuration = (long) (retryDuration * 
reconnectionProgressionFactor);
+                        if (retryDuration > maxReconnectionDuration) {
+                            retryDuration = initialReconnectDuration;
+                            log.info("The retry duration exceeded the maximum 
reconnection duration." +
+                                    " The retry duration is set to initial 
reconnection duration " +
+                                    "value(" + initialReconnectDuration + 
"s)");
+                        }
+                        log.error("The reconnection attempt number '" + 
count++ + "' failed. Next " +
+                                "re-try will be after '" + (retryDuration / 
1000) + "' seconds");
+                        try {
+                            Thread.sleep(retryDuration);
+                        } catch (InterruptedException ignore) {
+                            // we need to block
+                        }
+                    }
+                }
+
+                ConcurrentHashMap<String, AMQPTransportConnectionFactory> 
allFac =
+                        connectionFactoryManager.getAllFactories();
+
+                for (Map.Entry me : allFac.entrySet()) {
+                    String name = (String) me.getKey();
+                    Map<String, String> param = 
((AMQPTransportConnectionFactory)
+                            me.getValue()).getParameters();
+                    connectionFactoryManager.removeConnectionFactory(name);
+                    connectionFactoryManager.addConnectionFactory(
+                            name, new AMQPTransportConnectionFactory(param, 
es));
+                }
+
+                String conFacName = entry.getConnectionFactoryName();
+                AMQPTransportConnectionFactory cf = connectionFactoryManager.
+                        getConnectionFactory(conFacName);
+                connectionMap.put(
+                        entry.getKey(),
+                        new AMQPTransportHABrokerEntry(cf.getChannel(), 
cf.getConnection()));
+                entry.getLock().release();
+
+
+                while (blockedTasks.isEmpty()) {
+                    entry = blockedTasks.take();
+                    conFacName = entry.getConnectionFactoryName();
+                    cf = connectionFactoryManager.
+                            getConnectionFactory(conFacName);
+                    connectionMap.put(
+                            entry.getKey(),
+                            new AMQPTransportHABrokerEntry(cf.getChannel(), 
cf.getConnection()));
+                    entry.getLock().release();
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } catch (AMQPTransportException e) {
+            log.error("High Availability handler just died!. It's time to 
re-start", e);
+        }
+    }
+}
\ No newline at end of file

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
 Thu Aug  1 22:03:03 2013
@@ -151,25 +151,6 @@ public class AMQPTransportPollingTask {
     private int noOfConcurrentConsumers = 2;
 
     /**
-     * Initial duration(in milliseconds) to suspend the polling task in case 
of an error.
-     * {@link 
org.apache.synapse.transport.amqp.AMQPTransportConstant#PARAMETER_INITIAL_RE_CONNECTION_DURATION}.
-     */
-    private int initialReconnectDuration = 1000;
-
-    /**
-     * The progression factor for next re-try calculation.
-     * {@link AMQPTransportConstant#PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR}
-     */
-    private double reconnectionFactor = 2.0;
-
-    /**
-     * The maximum duration to suspend the polling task. This is to make sure 
there is an upper
-     * bound for the suspending the polling task in case of an error.
-     * {@link AMQPTransportConstant#PARAMETER_MAX_RE_CONNECTION_DURATION}
-     */
-    private int maxReconnectionDuration = 1000 * 60 * 10;
-
-    /**
      * The name of the connectionFactory this service is bound to.
      * {@link AMQPTransportConstant#PARAMETER_CONNECTION_FACTORY_NAME}
      */
@@ -302,18 +283,6 @@ public class AMQPTransportPollingTask {
         this.noOfConcurrentConsumers = noOfConcurrentConsumers;
     }
 
-    public void setInitialReconnectDuration(int initialReconnectDuration) {
-        this.initialReconnectDuration = initialReconnectDuration;
-    }
-
-    public void setReconnectionFactor(double reconnectionFactor) {
-        this.reconnectionFactor = reconnectionFactor;
-    }
-
-    public void setMaxReconnectionDuration(int maxReconnectionDuration) {
-        this.maxReconnectionDuration = maxReconnectionDuration;
-    }
-
     public void setConnectionFactoryName(String connectionFactoryName) {
         this.connectionFactoryName = connectionFactoryName;
     }
@@ -394,22 +363,6 @@ public class AMQPTransportPollingTask {
         return noOfConcurrentConsumers;
     }
 
-    public int getInitialReconnectDuration() {
-        return initialReconnectDuration;
-    }
-
-    public double getReconnectionFactor() {
-        return reconnectionFactor;
-    }
-
-    public int getMaxReconnectionDuration() {
-        return maxReconnectionDuration;
-    }
-
-    public String getConnectionFactoryName() {
-        return connectionFactoryName;
-    }
-
     public TimeUnit getScheduledTaskTimeUnit() {
         return scheduledTaskTimeUnit;
     }

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
 Thu Aug  1 22:03:03 2013
@@ -168,28 +168,6 @@ public class AMQPTransportPollingTaskFac
         }
 
         try {
-            Integer initialReconectionDuration = 
AMQPTransportUtils.getOptionalIntParameter(
-                    
AMQPTransportConstant.PARAMETER_INITIAL_RE_CONNECTION_DURATION,
-                    svcParam, conFacParam);
-            if (initialReconectionDuration != null) {
-                pt.setInitialReconnectDuration(initialReconectionDuration);
-            }
-        } catch (AMQPTransportException e) {
-            throw new AxisFault("Could not assign the initial re-connection 
duration", e);
-        }
-
-        try {
-            Integer reconnectionFactor = 
AMQPTransportUtils.getOptionalIntParameter(
-                    
AMQPTransportConstant.PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR,
-                    svcParam, conFacParam);
-            if (reconnectionFactor != null) {
-                pt.setReconnectionFactor(reconnectionFactor);
-            }
-        } catch (AMQPTransportException e) {
-            throw new AxisFault("Could not assign reconnection factor", e);
-        }
-
-        try {
             Integer dispatchingTask = 
AMQPTransportUtils.getOptionalIntParameter(
                     AMQPTransportConstant.PARAMETER_DISPATCHING_TASK_SIZE,
                     svcParam, conFacParam);
@@ -251,9 +229,6 @@ public class AMQPTransportPollingTaskFac
                     "Is queue restricted: '" + pt.isQueueRestricted() + "'\n" +
                     "Is queue auto deleted: '" + pt.isQueueAutoDelete() + 
"'\n" +
                     "Is blocking mode: '" + pt.isBlockingMode() + "'\n" +
-                    "Initial re-connection duration: '" + 
pt.getInitialReconnectDuration() + "(ms)'\n" +
-                    "Re-connection progression factor: '" + 
pt.getReconnectionFactor() + "'\n" +
-                    "Maximum re-connection duration: '" + 
pt.getMaxReconnectionDuration() + "'\n" +
                     "Number of concurrent consumers: '" + 
pt.getNoOfConcurrentConsumers() + "'\n" +
                     "Number of dispatching task: '" + 
pt.getNoOfDispatchingTask() + "'");
         }

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPTransportUtilsTest.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPTransportUtilsTest.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPTransportUtilsTest.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPTransportUtilsTest.java
 Thu Aug  1 22:03:03 2013
@@ -40,7 +40,7 @@ public class AMQPTransportUtilsTest exte
 
         cfMap.put(AMQPTransportConstant.PARAMETER_EXCHANGE_TYPE, "direct");
         cfMap.put(AMQPTransportConstant.PARAMETER_QUEUE_DURABLE, "true");
-        
cfMap.put(AMQPTransportConstant.PARAMETER_INITIAL_RE_CONNECTION_DURATION, "10");
+        cfMap.put(AMQPTransportConstant.PARAM_INITIAL_RE_CONNECTION_DURATION, 
"10");
     }
 
     public void testGetStringProperty() throws Exception {
@@ -109,9 +109,9 @@ public class AMQPTransportUtilsTest exte
 
     public void testGetOptionalIntParameter() throws Exception {
         assertEquals("Invalid value",
-                
Integer.parseInt(cfMap.get(AMQPTransportConstant.PARAMETER_INITIAL_RE_CONNECTION_DURATION)),
+                
Integer.parseInt(cfMap.get(AMQPTransportConstant.PARAM_INITIAL_RE_CONNECTION_DURATION)),
                 AMQPTransportUtils.getOptionalIntParameter(
-                        
AMQPTransportConstant.PARAMETER_INITIAL_RE_CONNECTION_DURATION, svcMap, 
cfMap).intValue());
+                        
AMQPTransportConstant.PARAM_INITIAL_RE_CONNECTION_DURATION, svcMap, 
cfMap).intValue());
 
         assertEquals("Invalid value",
                 
Integer.parseInt(svcMap.get(AMQPTransportConstant.PARAMETER_NO_OF_CONCURRENT_CONSUMERS)),


Reply via email to