Author: rajikak
Date: Fri Aug 2 03:46:36 2013
New Revision: 1509528
URL: http://svn.apache.org/r1509528
Log:
handled the ctrl+c signal.
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java?rev=1509528&r1=1509527&r2=1509528&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
Fri Aug 2 03:46:36 2013
@@ -37,6 +37,8 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
@@ -206,7 +208,8 @@ public class AMQPTransportPollingTask {
private String configuredContentType =
AMQPTransportConstant.DEFAULT_CONTENT_TYPE;
- private ScheduledFuture<?> pollingTaskFuture;
+ private List<ScheduledFuture<?>> taskFutureList = new
ArrayList<ScheduledFuture<?>>();
+
private AMQPTransportReconnectHandler haHandler;
@@ -447,11 +450,13 @@ public class AMQPTransportPollingTask {
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean isAutoAck = isUseTx == true ? false : true; //
increase readability
channel.basicConsume(queueName, isAutoAck, consumer);
- pollingTaskFuture =
pollingTaskScheduler.scheduleWithFixedDelay(
+ ScheduledFuture<?> pollingTaskFuture =
pollingTaskScheduler.scheduleWithFixedDelay(
new MessageIOTask(consumer, buffers, isUseTx),
scheduledTaskInitialDelay,
scheduledTaskDelay,
scheduledTaskTimeUnit);
+ taskFutureList.add(pollingTaskFuture);
+
} catch (IOException e) {
handleException(e.getMessage(), e);
}
@@ -464,10 +469,12 @@ public class AMQPTransportPollingTask {
}
/**
- * Stop the polling task
+ * Stop the polling tasks
*/
public synchronized void stop() {
- pollingTaskFuture.cancel(false);
+ for (ScheduledFuture<?> pollingTaskFuture : taskFutureList) {
+ pollingTaskFuture.cancel(false);
+ }
}
/**
@@ -512,26 +519,38 @@ public class AMQPTransportPollingTask {
log.error("I/O error occurs for the polling tasks for service
'" + serviceName +
"'", e);
} catch (ShutdownSignalException e) {
- log.error("Polling task for service '" + serviceName + "'
received a " +
- "shutdown signal", e);
- Semaphore available = new Semaphore(0, true);
- String key = UUID.randomUUID().toString();
- haHandler.getBlockedTasks().add(new AMQPTransportHAEntry(
- available, key, connectionFactoryName));
- try {
- available.acquire();
- } catch (InterruptedException ie) {
- log.error("The blocking semaphore received an
interrupted", e);
- Thread.currentThread().interrupt();
- return;
- }
- AMQPTransportHABrokerEntry brokerEntry =
haHandler.getConnectionMap().get(key);
- if (brokerEntry == null) {
- log.error("No new connection factory were found for key '"
+ key + "'");
+ if (e.isHardError()) {
+ // broker is offline
+ log.error("Polling task for service '" + serviceName + "'
received a " +
+ "shutdown signal", e);
+ Semaphore available = new Semaphore(0, true);
+ String key = UUID.randomUUID().toString();
+ haHandler.getBlockedTasks().add(new AMQPTransportHAEntry(
+ available, key, connectionFactoryName));
+ try {
+ available.acquire();
+ } catch (InterruptedException ie) {
+ log.error("The blocking semaphore received an
interrupted", e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+
+ AMQPTransportHABrokerEntry brokerEntry =
haHandler.getConnectionMap().get(key);
+ if (brokerEntry == null) {
+ log.error("No new connection factory were found for
key '" + key + "'");
+ } else {
+ setChannel(brokerEntry.getChannel());
+ stop();
+ try {
+ start();
+ log.info("Polling task for service '" +
serviceName + "' is re-deployed");
+ } catch (AMQPTransportException ex) {
+ log.error("Start of polling tasks failed. System
must be restarted!");
+ }
+ }
} else {
- setChannel(brokerEntry.getChannel());
- this.queueingConsumer = new QueueingConsumer(channel);
+ // this is a shutdown signal for ctrl+c
}
} catch (ConsumerCancelledException e) {