Author: rajikak
Date: Fri Aug  2 00:27:32 2013
New Revision: 1509495

URL: http://svn.apache.org/r1509495
Log:
added ha implementations.

Modified:
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportEndpoint.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportEndpoint.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportEndpoint.java?rev=1509495&r1=1509494&r2=1509495&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportEndpoint.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportEndpoint.java
 Fri Aug  2 00:27:32 2013
@@ -24,7 +24,6 @@ import org.apache.synapse.transport.amqp
 
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
 /**
@@ -77,7 +76,8 @@ public class AMQPTransportEndpoint exten
                     service,
                     workerPool,
                     this,
-                    conFac);
+                    conFac,
+                    transportReceiver.getHaHandler());
 
         } catch (AMQPTransportException e) {
             throw new AxisFault("Could not load the AMQP endpoint 
configuration, " + e.getMessage(), e);

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java?rev=1509495&r1=1509494&r2=1509495&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
 Fri Aug  2 00:27:32 2013
@@ -41,7 +41,7 @@ public class AMQPTransportListener exten
 
     private ExecutorService connectionFactoryES;
 
-    private AMQPTransportReconnectHandler haHandler;
+    private AMQPTransportReconnectHandler haHandlerTask;
 
     @Override
     protected void doInit() throws AxisFault {
@@ -71,14 +71,14 @@ public class AMQPTransportListener exten
         int maxReconnectionDuration = AMQPTransportUtils.getIntProperty(
                 AMQPTransportConstant.PARAM_MAX_RE_CONNECTION_DURATION, 1000 * 
60 * 10);
 
-        haHandler = new AMQPTransportReconnectHandler(
+        haHandlerTask = new AMQPTransportReconnectHandler(
                 connectionFactoryES,
                 maxReconnectionDuration,
                 reconnectionProgressionFactor,
                 initialReconnectDuration,
                 connectionFactoryManager);
 
-        new Thread(haHandler, "AMQP-HA-handler-task").start();
+        new Thread(haHandlerTask, "AMQP-HA-handler-task").start();
 
         log.info("AMQP transport listener initializing..");
     }
@@ -137,4 +137,8 @@ public class AMQPTransportListener exten
         return connectionFactoryManager.getConnectionFactory(
                 AMQPTransportConstant.DEFAULT_CONNECTION_FACTORY_NAME);
     }
+
+    public AMQPTransportReconnectHandler getHaHandler(){
+        return haHandlerTask;
+    }
 }

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java?rev=1509495&r1=1509494&r2=1509495&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
 Fri Aug  2 00:27:32 2013
@@ -98,8 +98,10 @@ public class AMQPTransportConnectionFact
             } catch (IOException e) {
                 throw new AMQPTransportException("Could not remove the 
connection '" + name + "'", e);
             }
+        } else {
+            throw new AMQPTransportException("No connection factory found with 
the name '"
+                    + name + "'");
         }
-        throw new AMQPTransportException("No connection factory found with the 
name '" + name + "'");
     }
 
     /**

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java?rev=1509495&r1=1509494&r2=1509495&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
 Fri Aug  2 00:27:32 2013
@@ -97,7 +97,7 @@ public class AMQPTransportReconnectHandl
                                     " The retry duration is set to initial 
reconnection duration " +
                                     "value(" + initialReconnectDuration + 
"s)");
                         }
-                        log.error("The reconnection attempt number '" + 
count++ + "' failed. Next " +
+                        log.info("The reconnection attempt number '" + count++ 
+ "' failed. Next " +
                                 "re-try will be after '" + (retryDuration / 
1000) + "' seconds");
                         try {
                             Thread.sleep(retryDuration);
@@ -117,6 +117,7 @@ public class AMQPTransportReconnectHandl
                     connectionFactoryManager.removeConnectionFactory(name);
                     connectionFactoryManager.addConnectionFactory(
                             name, new AMQPTransportConnectionFactory(param, 
es));
+                    log.info("A new connection factory was created for -> '" + 
name + "'");
                 }
 
                 String conFacName = entry.getConnectionFactoryName();
@@ -127,7 +128,6 @@ public class AMQPTransportReconnectHandl
                         new AMQPTransportHABrokerEntry(cf.getChannel(), 
cf.getConnection()));
                 entry.getLock().release();
 
-
                 while (blockedTasks.isEmpty()) {
                     entry = blockedTasks.take();
                     conFacName = entry.getConnectionFactoryName();
@@ -136,13 +136,23 @@ public class AMQPTransportReconnectHandl
                     connectionMap.put(
                             entry.getKey(),
                             new AMQPTransportHABrokerEntry(cf.getChannel(), 
cf.getConnection()));
+                    log.info("The task with key '" + entry.getKey() + "' was 
combined with a new " +
+                            "connection factory");
                     entry.getLock().release();
                 }
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
         } catch (AMQPTransportException e) {
-            log.error("High Availability handler just died!. It's time to 
re-start", e);
+            log.error("High Availability handler just died!. It's time to 
reboot the system.", e);
         }
     }
+
+    public BlockingQueue<AMQPTransportHAEntry> getBlockedTasks() {
+        return blockedTasks;
+    }
+
+    public ConcurrentMap<String, AMQPTransportHABrokerEntry> 
getConnectionMap() {
+        return connectionMap;
+    }
 }
\ No newline at end of file

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java?rev=1509495&r1=1509494&r2=1509495&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
 Fri Aug  2 00:27:32 2013
@@ -29,16 +29,17 @@ import org.apache.axis2.transport.http.H
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.transport.amqp.*;
+import org.apache.synapse.transport.amqp.ha.AMQPTransportHABrokerEntry;
+import org.apache.synapse.transport.amqp.ha.AMQPTransportHAEntry;
+import org.apache.synapse.transport.amqp.ha.AMQPTransportReconnectHandler;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.UUID;
+import java.util.concurrent.*;
 
 /**
  * The polling task deploy for each services exposed on AMQP transport. This 
task
@@ -207,6 +208,8 @@ public class AMQPTransportPollingTask {
 
     private ScheduledFuture<?> pollingTaskFuture;
 
+    private AMQPTransportReconnectHandler haHandler;
+
     public void setUseTx(boolean useTx) {
         isUseTx = useTx;
     }
@@ -383,6 +386,10 @@ public class AMQPTransportPollingTask {
         this.responseConnectionFactory = responseConnectionFactory;
     }
 
+    public void setHaHandler(AMQPTransportReconnectHandler haHandler) {
+        this.haHandler = haHandler;
+    }
+
     /**
      * Start the polling task for this service
      */
@@ -507,6 +514,26 @@ public class AMQPTransportPollingTask {
             } catch (ShutdownSignalException e) {
                 log.error("Polling task for service '" + serviceName + "' 
received a " +
                         "shutdown signal", e);
+                Semaphore available = new Semaphore(0, true);
+                String key = UUID.randomUUID().toString();
+                haHandler.getBlockedTasks().add(new AMQPTransportHAEntry(
+                        available, key, connectionFactoryName));
+                try {
+                    available.acquire();
+                } catch (InterruptedException ie) {
+                    log.error("The blocking semaphore received an 
interrupted", e);
+                    Thread.currentThread().interrupt();
+                    return;
+                }
+
+                AMQPTransportHABrokerEntry brokerEntry = 
haHandler.getConnectionMap().get(key);
+                if (brokerEntry == null) {
+                    log.error("No new connection factory were found for key '" 
+ key + "'");
+                } else {
+                    setChannel(brokerEntry.getChannel());
+                    this.queueingConsumer = new QueueingConsumer(channel);
+                }
+
             } catch (ConsumerCancelledException e) {
                 log.error("Polling task for service '" + serviceName + "' 
received a " +
                         "cancellation signal");

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java?rev=1509495&r1=1509494&r2=1509495&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
 Fri Aug  2 00:27:32 2013
@@ -19,6 +19,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.transport.amqp.*;
 import 
org.apache.synapse.transport.amqp.connectionfactory.AMQPTransportConnectionFactory;
+import org.apache.synapse.transport.amqp.ha.AMQPTransportReconnectHandler;
 
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
@@ -36,7 +37,8 @@ public class AMQPTransportPollingTaskFac
             AxisService service,
             ScheduledExecutorService pool,
             AMQPTransportEndpoint endpoint,
-            AMQPTransportConnectionFactory connectionFactory) throws AxisFault 
{
+            AMQPTransportConnectionFactory connectionFactory,
+            AMQPTransportReconnectHandler haHandler) throws AxisFault {
 
         Map<String, String> svcParam =
                 
AMQPTransportUtils.getServiceStringParameters(service.getParameters());
@@ -48,6 +50,7 @@ public class AMQPTransportPollingTaskFac
         pt.setServiceName(service.getName());
         pt.setEndpoint(endpoint);
         pt.setPollingTaskScheduler(pool);
+        pt.setHaHandler(haHandler);
 
         // set buffers to hold request/response messages for this task
         pt.setBuffers(new AMQPTransportBuffers());
@@ -89,7 +92,6 @@ public class AMQPTransportPollingTaskFac
             pt.setInternalExchange(isInternalExchange);
         }
 
-
         pt.setChannel(connectionFactory.getChannel());
         pt.setConnectionFactoryName(connectionFactory.getName());
 


Reply via email to