Author: rajikak
Date: Mon Jul 29 18:48:33 2013
New Revision: 1508174

URL: http://svn.apache.org/r1508174
Log:
handled the shutdown signal + fixed the clients.

Modified:
    
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPConsumerClient.java
    
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPProducerClient.java

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java?rev=1508174&r1=1508173&r2=1508174&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
 Mon Jul 29 18:48:33 2013
@@ -13,9 +13,7 @@
  */
 package org.apache.synapse.transport.amqp.pollingtask;
 
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.*;
 import org.apache.axiom.om.OMAbstractFactory;
 import org.apache.axiom.om.OMDocument;
 import org.apache.axiom.om.OMElement;
@@ -545,6 +543,12 @@ public class AMQPTransportPollingTask {
             } catch (IOException e) {
                 log.error("I/O error occurs for the polling tasks for service 
'" + serviceName +
                         "'", e);
+            } catch (ShutdownSignalException e) {
+                log.error("Polling task for service '" + serviceName + "' 
received a " +
+                        "shutdown signal", e);
+            } catch (ConsumerCancelledException e) {
+                log.error("Polling task for service '" + serviceName + "' 
received a cancellation " +
+                        "signal");
             }
         }
     }

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPConsumerClient.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPConsumerClient.java?rev=1508174&r1=1508173&r2=1508174&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPConsumerClient.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPConsumerClient.java
 Mon Jul 29 18:48:33 2013
@@ -12,20 +12,29 @@ import java.io.IOException;
  */
 public class AMQPConsumerClient {
 
-    public static final String QUEUE_NAME = "ProducerProxy";
+    private static final String QUEUE_NAME = "ProducerProxy";
 
     public static void main(String[] args) throws IOException, 
InterruptedException {
 
+        String queueName;
+
+        if (args.length < 1) {
+            System.out.println("Usage: java AMQPConsumerClient <queue-name>");
+            System.out.println("Default arguments will be used");
+            queueName = QUEUE_NAME;
+        }
+        queueName = args[1];
+
         ConnectionFactory factory = new ConnectionFactory();
         factory.setHost("localhost");
         Connection connection = factory.newConnection();
 
         Channel channel = connection.createChannel();
-        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        channel.queueDeclare(queueName, false, false, false, null);
 
         QueueingConsumer consumer = new QueueingConsumer(channel);
-        channel.basicConsume(QUEUE_NAME, true, consumer);
-        System.out.println("Waiting for message on queue '" + QUEUE_NAME + 
"'");
+        channel.basicConsume(queueName, true, consumer);
+        System.out.println("Waiting for message on queue '" + queueName + "'");
 
         while (true) {
             QueueingConsumer.Delivery delivery = consumer.nextDelivery();

Modified: 
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPProducerClient.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPProducerClient.java?rev=1508174&r1=1508173&r2=1508174&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPProducerClient.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPProducerClient.java
 Mon Jul 29 18:48:33 2013
@@ -11,8 +11,6 @@ import java.io.IOException;
  */
 public class AMQPProducerClient {
 
-    //    private static final String QUEUE_NAME = "ConsumerTxProxy";
-//    private static final String QUEUE_NAME = "worker-queue";
     private static final String QUEUE_NAME = "ProducerProxy";
 
     private static final String MESSAGE =
@@ -42,33 +40,67 @@ public class AMQPProducerClient {
 
     public static void main(String[] args) throws IOException {
 
+        if (args.length < 2) {
+            System.out.println("Usage: java AMQPProducerClient <queue?> " +
+                    "<queue|exchange-name> <routing-key>");
+            System.out.println("Default arguments will be used");
+        }
+
         ConnectionFactory factory = new ConnectionFactory();
         factory.setHost("localhost");
         Connection connection = factory.newConnection();
 
         Channel channel = connection.createChannel();
 
-        //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-        AMQPProducerClient.produce(MESSAGE2, channel, QUEUE_NAME);
-//        AMQPProducerClient.publish(MESSAGE2, channel, "subscriber-exchange");
-//        AMQPProducerClient.route(MESSAGE2, channel, "route-exchange", 
"fatal");
-//        AMQPProducerClient.route(MESSAGE2, channel, "topic-exchange", 
"kern.critical");
+        String queueName = null, exchangeName = null, routingKey = null;
+
+        if ("y".equals(args[0])) {
+            if (args[1] != null) {
+                queueName = args[1];
+            } else {
+                queueName = QUEUE_NAME;
+            }
+        } else {
+            if (args[2] != null) {
+                exchangeName = args[2];
+                if (args[3] != null) {
+                    routingKey = args[3];
+                } else {
+                    routingKey = "kern.critical";
+                }
+            } else {
+                exchangeName = "subscriber-exchange";
+            }
+        }
+
+        if (queueName != null) {
+            AMQPProducerClient.produce(MESSAGE2, channel, QUEUE_NAME);
+        } else {
+            if (routingKey != null) {
+                AMQPProducerClient.route(MESSAGE2, channel, exchangeName, 
routingKey);
+            } else {
+                AMQPProducerClient.publish(MESSAGE2, channel, exchangeName);
+            }
+        }
 
         channel.close();
         connection.close();
-
     }
 
 
-    private static void produce(String message, Channel channel, String 
queueName) throws IOException {
+    private static void produce(String message, Channel channel, String 
queueName)
+            throws IOException {
         channel.basicPublish("", queueName, null, message.getBytes());
     }
 
-    private static void publish(String message, Channel channel, String 
exchangeName) throws IOException {
+    private static void publish(String message, Channel channel, String 
exchangeName)
+            throws IOException {
         channel.basicPublish(exchangeName, "", null, message.getBytes());
     }
 
-    private static void route(String message, Channel channel, String 
exchangeName, String routeKey) throws IOException {
+    private static void route(String message, Channel channel, String 
exchangeName,
+                              String routeKey)
+            throws IOException {
         channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
     }
 


Reply via email to