This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 710715271 Fix build and disable helix-front build
710715271 is described below

commit 710715271b626fb5d9b01aeb31e15b4e8d8f45e4
Author: Junkai Xue <[email protected]>
AuthorDate: Fri Feb 23 14:25:56 2024 -0800

    Fix build and disable helix-front build
---
 pom.xml                                            |  2 +-
 recipes/rabbitmq-consumer-group/pom.xml            |  2 +-
 .../helix/recipes/rabbitmq/ConsumerThread.java     | 37 +++++++++++++++-------
 3 files changed, 27 insertions(+), 14 deletions(-)

diff --git a/pom.xml b/pom.xml
index 55a356530..1827b8e1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -316,7 +316,7 @@
     <module>helix-rest</module>
     <module>helix-lock</module>
     <module>helix-agent</module>
-    <module>helix-front</module>
+    <!--<module>helix-front</module> disable build for helix-front-->
     <module>recipes</module>
     <module>helix-view-aggregator</module>
     <module>meta-client</module>
diff --git a/recipes/rabbitmq-consumer-group/pom.xml 
b/recipes/rabbitmq-consumer-group/pom.xml
index d87467c43..4ebf02f97 100644
--- a/recipes/rabbitmq-consumer-group/pom.xml
+++ b/recipes/rabbitmq-consumer-group/pom.xml
@@ -84,7 +84,7 @@
     <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
-      <version>5.18.0</version>
+      <version>5.20.0</version>
     </dependency>
     <dependency>
       <groupId>com.101tec</groupId>
diff --git 
a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
 
b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
index f2a5a2e47..1c61b4c92 100644
--- 
a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
+++ 
b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
@@ -21,10 +21,14 @@ package org.apache.helix.recipes.rabbitmq;
 
 import java.io.IOException;
 
+import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Delivery;
+import com.rabbitmq.client.Envelope;
 
 public class ConsumerThread extends Thread {
   private static final String EXCHANGE_NAME = "topic_logs";
@@ -56,19 +60,14 @@ public class ConsumerThread extends Thread {
       System.out.println(" [*] " + _consumerId + " Waiting for messages on " + 
bindingKey
           + ". To exit press CTRL+C");
 
-      QueueingConsumer consumer = new QueueingConsumer(channel);
+      Consumer consumer = new MyConsumer(channel);
       channel.basicConsume(queueName, true, consumer);
 
-      while (true) {
-        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
-        String message = new String(delivery.getBody());
-        String routingKey = delivery.getEnvelope().getRoutingKey();
-
-        System.out.println(" [x] " + _consumerId + " Received '" + routingKey 
+ "':'" + message
-            + "'");
-      }
-    } catch (InterruptedException e) {
-      System.err.println(" [-] " + _consumerId + " on " + _partition + " is 
interrupted ...");
+      String consumerTag = channel.basicConsume(queueName, false, consumer);
+      System.out.println("press any key to terminate");
+      System.in.read();
+      channel.basicCancel(consumerTag);
+      channel.close();
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
@@ -82,4 +81,18 @@ public class ConsumerThread extends Thread {
       }
     }
   }
+
+  private static class MyConsumer extends DefaultConsumer {
+
+    public MyConsumer(Channel channel) {
+      super(channel);
+    }
+
+    @Override
+    public void handleDelivery(String consumerTag, Envelope envelope,
+        AMQP.BasicProperties properties, byte[] body) {
+      System.out.println(
+          " [x] Received '" + envelope.getRoutingKey() + "':'" + new 
String(body) + "'");
+    }
+  }
 }

Reply via email to