This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 710715271 Fix build and disable helix-front build
710715271 is described below
commit 710715271b626fb5d9b01aeb31e15b4e8d8f45e4
Author: Junkai Xue <[email protected]>
AuthorDate: Fri Feb 23 14:25:56 2024 -0800
Fix build and disable helix-front build
---
pom.xml | 2 +-
recipes/rabbitmq-consumer-group/pom.xml | 2 +-
.../helix/recipes/rabbitmq/ConsumerThread.java | 37 +++++++++++++++-------
3 files changed, 27 insertions(+), 14 deletions(-)
diff --git a/pom.xml b/pom.xml
index 55a356530..1827b8e1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -316,7 +316,7 @@
<module>helix-rest</module>
<module>helix-lock</module>
<module>helix-agent</module>
- <module>helix-front</module>
+ <!--<module>helix-front</module> disable build for helix-front-->
<module>recipes</module>
<module>helix-view-aggregator</module>
<module>meta-client</module>
diff --git a/recipes/rabbitmq-consumer-group/pom.xml
b/recipes/rabbitmq-consumer-group/pom.xml
index d87467c43..4ebf02f97 100644
--- a/recipes/rabbitmq-consumer-group/pom.xml
+++ b/recipes/rabbitmq-consumer-group/pom.xml
@@ -84,7 +84,7 @@
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
- <version>5.18.0</version>
+ <version>5.20.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
diff --git
a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
index f2a5a2e47..1c61b4c92 100644
---
a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
+++
b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
@@ -21,10 +21,14 @@ package org.apache.helix.recipes.rabbitmq;
import java.io.IOException;
+import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Delivery;
+import com.rabbitmq.client.Envelope;
public class ConsumerThread extends Thread {
private static final String EXCHANGE_NAME = "topic_logs";
@@ -56,19 +60,14 @@ public class ConsumerThread extends Thread {
System.out.println(" [*] " + _consumerId + " Waiting for messages on " +
bindingKey
+ ". To exit press CTRL+C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
+ Consumer consumer = new MyConsumer(channel);
channel.basicConsume(queueName, true, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- String routingKey = delivery.getEnvelope().getRoutingKey();
-
- System.out.println(" [x] " + _consumerId + " Received '" + routingKey
+ "':'" + message
- + "'");
- }
- } catch (InterruptedException e) {
- System.err.println(" [-] " + _consumerId + " on " + _partition + " is
interrupted ...");
+ String consumerTag = channel.basicConsume(queueName, false, consumer);
+ System.out.println("press any key to terminate");
+ System.in.read();
+ channel.basicCancel(consumerTag);
+ channel.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
@@ -82,4 +81,18 @@ public class ConsumerThread extends Thread {
}
}
}
+
+ private static class MyConsumer extends DefaultConsumer {
+
+ public MyConsumer(Channel channel) {
+ super(channel);
+ }
+
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope,
+ AMQP.BasicProperties properties, byte[] body) {
+ System.out.println(
+ " [x] Received '" + envelope.getRoutingKey() + "':'" + new
String(body) + "'");
+ }
+ }
}