Author: rajikak
Date: Mon Jul 29 18:48:33 2013
New Revision: 1508174
URL: http://svn.apache.org/r1508174
Log:
handled the shutdown signal + fixed the clients.
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPConsumerClient.java
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPProducerClient.java
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java?rev=1508174&r1=1508173&r2=1508174&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
Mon Jul 29 18:48:33 2013
@@ -13,9 +13,7 @@
*/
package org.apache.synapse.transport.amqp.pollingtask;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.*;
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMDocument;
import org.apache.axiom.om.OMElement;
@@ -545,6 +543,12 @@ public class AMQPTransportPollingTask {
} catch (IOException e) {
log.error("I/O error occurs for the polling tasks for service
'" + serviceName +
"'", e);
+ } catch (ShutdownSignalException e) {
+ log.error("Polling task for service '" + serviceName + "'
received a " +
+ "shutdown signal", e);
+ } catch (ConsumerCancelledException e) {
+ log.error("Polling task for service '" + serviceName + "'
received a cancellation " +
+ "signal");
}
}
}
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPConsumerClient.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPConsumerClient.java?rev=1508174&r1=1508173&r2=1508174&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPConsumerClient.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPConsumerClient.java
Mon Jul 29 18:48:33 2013
@@ -12,20 +12,29 @@ import java.io.IOException;
*/
public class AMQPConsumerClient {
- public static final String QUEUE_NAME = "ProducerProxy";
+ private static final String QUEUE_NAME = "ProducerProxy";
public static void main(String[] args) throws IOException,
InterruptedException {
+ String queueName;
+
+ if (args.length < 1) {
+ System.out.println("Usage: java AMQPConsumerClient <queue-name>");
+ System.out.println("Default arguments will be used");
+ queueName = QUEUE_NAME;
+ }
+ queueName = args[1];
+
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+ channel.queueDeclare(queueName, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(QUEUE_NAME, true, consumer);
- System.out.println("Waiting for message on queue '" + QUEUE_NAME +
"'");
+ channel.basicConsume(queueName, true, consumer);
+ System.out.println("Waiting for message on queue '" + queueName + "'");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPProducerClient.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPProducerClient.java?rev=1508174&r1=1508173&r2=1508174&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPProducerClient.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPProducerClient.java
Mon Jul 29 18:48:33 2013
@@ -11,8 +11,6 @@ import java.io.IOException;
*/
public class AMQPProducerClient {
- // private static final String QUEUE_NAME = "ConsumerTxProxy";
-// private static final String QUEUE_NAME = "worker-queue";
private static final String QUEUE_NAME = "ProducerProxy";
private static final String MESSAGE =
@@ -42,33 +40,67 @@ public class AMQPProducerClient {
public static void main(String[] args) throws IOException {
+ if (args.length < 2) {
+ System.out.println("Usage: java AMQPProducerClient <queue?> " +
+ "<queue|exchange-name> <routing-key>");
+ System.out.println("Default arguments will be used");
+ }
+
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
- //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- AMQPProducerClient.produce(MESSAGE2, channel, QUEUE_NAME);
-// AMQPProducerClient.publish(MESSAGE2, channel, "subscriber-exchange");
-// AMQPProducerClient.route(MESSAGE2, channel, "route-exchange",
"fatal");
-// AMQPProducerClient.route(MESSAGE2, channel, "topic-exchange",
"kern.critical");
+ String queueName = null, exchangeName = null, routingKey = null;
+
+ if ("y".equals(args[0])) {
+ if (args[1] != null) {
+ queueName = args[1];
+ } else {
+ queueName = QUEUE_NAME;
+ }
+ } else {
+ if (args[2] != null) {
+ exchangeName = args[2];
+ if (args[3] != null) {
+ routingKey = args[3];
+ } else {
+ routingKey = "kern.critical";
+ }
+ } else {
+ exchangeName = "subscriber-exchange";
+ }
+ }
+
+ if (queueName != null) {
+ AMQPProducerClient.produce(MESSAGE2, channel, QUEUE_NAME);
+ } else {
+ if (routingKey != null) {
+ AMQPProducerClient.route(MESSAGE2, channel, exchangeName,
routingKey);
+ } else {
+ AMQPProducerClient.publish(MESSAGE2, channel, exchangeName);
+ }
+ }
channel.close();
connection.close();
-
}
- private static void produce(String message, Channel channel, String
queueName) throws IOException {
+ private static void produce(String message, Channel channel, String
queueName)
+ throws IOException {
channel.basicPublish("", queueName, null, message.getBytes());
}
- private static void publish(String message, Channel channel, String
exchangeName) throws IOException {
+ private static void publish(String message, Channel channel, String
exchangeName)
+ throws IOException {
channel.basicPublish(exchangeName, "", null, message.getBytes());
}
- private static void route(String message, Channel channel, String
exchangeName, String routeKey) throws IOException {
+ private static void route(String message, Channel channel, String
exchangeName,
+ String routeKey)
+ throws IOException {
channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
}