Author: rajikak
Date: Fri Aug  2 03:46:36 2013
New Revision: 1509528

URL: http://svn.apache.org/r1509528
Log:
handled the ctrl+c signal.

Modified:
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java?rev=1509528&r1=1509527&r2=1509528&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
 Fri Aug  2 03:46:36 2013
@@ -37,6 +37,8 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.*;
@@ -206,7 +208,8 @@ public class AMQPTransportPollingTask {
 
     private String configuredContentType = 
AMQPTransportConstant.DEFAULT_CONTENT_TYPE;
 
-    private ScheduledFuture<?> pollingTaskFuture;
+    private List<ScheduledFuture<?>> taskFutureList = new 
ArrayList<ScheduledFuture<?>>();
+
 
     private AMQPTransportReconnectHandler haHandler;
 
@@ -447,11 +450,13 @@ public class AMQPTransportPollingTask {
                 QueueingConsumer consumer = new QueueingConsumer(channel);
                 boolean isAutoAck = isUseTx == true ? false : true; // 
increase readability
                 channel.basicConsume(queueName, isAutoAck, consumer);
-                pollingTaskFuture = 
pollingTaskScheduler.scheduleWithFixedDelay(
+                ScheduledFuture<?> pollingTaskFuture = 
pollingTaskScheduler.scheduleWithFixedDelay(
                         new MessageIOTask(consumer, buffers, isUseTx),
                         scheduledTaskInitialDelay,
                         scheduledTaskDelay,
                         scheduledTaskTimeUnit);
+                taskFutureList.add(pollingTaskFuture);
+
             } catch (IOException e) {
                 handleException(e.getMessage(), e);
             }
@@ -464,10 +469,12 @@ public class AMQPTransportPollingTask {
     }
 
     /**
-     * Stop the polling task
+     * Stop the polling tasks
      */
     public synchronized void stop() {
-        pollingTaskFuture.cancel(false);
+        for (ScheduledFuture<?> pollingTaskFuture : taskFutureList) {
+            pollingTaskFuture.cancel(false);
+        }
     }
 
     /**
@@ -512,26 +519,38 @@ public class AMQPTransportPollingTask {
                 log.error("I/O error occurs for the polling tasks for service 
'" + serviceName +
                         "'", e);
             } catch (ShutdownSignalException e) {
-                log.error("Polling task for service '" + serviceName + "' 
received a " +
-                        "shutdown signal", e);
-                Semaphore available = new Semaphore(0, true);
-                String key = UUID.randomUUID().toString();
-                haHandler.getBlockedTasks().add(new AMQPTransportHAEntry(
-                        available, key, connectionFactoryName));
-                try {
-                    available.acquire();
-                } catch (InterruptedException ie) {
-                    log.error("The blocking semaphore received an 
interrupted", e);
-                    Thread.currentThread().interrupt();
-                    return;
-                }
 
-                AMQPTransportHABrokerEntry brokerEntry = 
haHandler.getConnectionMap().get(key);
-                if (brokerEntry == null) {
-                    log.error("No new connection factory were found for key '" 
+ key + "'");
+                if (e.isHardError()) {
+                    // broker is offline
+                    log.error("Polling task for service '" + serviceName + "' 
received a " +
+                            "shutdown signal", e);
+                    Semaphore available = new Semaphore(0, true);
+                    String key = UUID.randomUUID().toString();
+                    haHandler.getBlockedTasks().add(new AMQPTransportHAEntry(
+                            available, key, connectionFactoryName));
+                    try {
+                        available.acquire();
+                    } catch (InterruptedException ie) {
+                        log.error("The blocking semaphore received an 
interrupted", e);
+                        Thread.currentThread().interrupt();
+                        return;
+                    }
+
+                    AMQPTransportHABrokerEntry brokerEntry = 
haHandler.getConnectionMap().get(key);
+                    if (brokerEntry == null) {
+                        log.error("No new connection factory were found for 
key '" + key + "'");
+                    } else {
+                        setChannel(brokerEntry.getChannel());
+                        stop();
+                        try {
+                            start();
+                            log.info("Polling task for service '" + 
serviceName + "' is re-deployed");
+                        } catch (AMQPTransportException ex) {
+                            log.error("Start of polling tasks failed. System 
must be restarted!");
+                        }
+                    }
                 } else {
-                    setChannel(brokerEntry.getChannel());
-                    this.queueingConsumer = new QueueingConsumer(channel);
+                    // this is a shutdown signal for ctrl+c
                 }
 
             } catch (ConsumerCancelledException e) {


Reply via email to