This is an automated email from the ASF dual-hosted git repository.

yasith pushed a commit to branch cybershuttle-dev
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 3f7df10a72812ccad9191fad7db44874f308d6fb
Author: yasith <[email protected]>
AuthorDate: Thu Apr 24 01:46:02 2025 -0500

    fix rabbitmq deprecation
---
 .../messaging/core/impl/ExperimentConsumer.java    | 30 ++++++++++++----------
 .../messaging/core/impl/MessageConsumer.java       | 29 +++++++++++----------
 .../messaging/core/impl/ProcessConsumer.java       | 28 ++++++++++----------
 3 files changed, 45 insertions(+), 42 deletions(-)

diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
index 16b75275ad..5025a5e7ca 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
@@ -23,26 +23,29 @@ import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.QueueingConsumer;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.model.messaging.event.*;
+import 
org.apache.airavata.model.messaging.event.ExperimentIntermediateOutputsEvent;
+import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent;
+import org.apache.airavata.model.messaging.event.Message;
+import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import com.rabbitmq.client.DefaultConsumer;
 
 import java.io.IOException;
 
-public class ExperimentConsumer extends QueueingConsumer {
+public class ExperimentConsumer extends DefaultConsumer {
     private static final Logger log = 
LoggerFactory.getLogger(ExperimentConsumer.class);
 
-    private MessageHandler handler;
+    private final MessageHandler handler;
     private Channel channel;
-    private Connection connection;
+    private final Connection connection;
 
     public ExperimentConsumer(MessageHandler messageHandler, Connection 
connection, Channel channel) {
         super(channel);
@@ -51,12 +54,11 @@ public class ExperimentConsumer extends QueueingConsumer {
         this.channel = channel;
     }
 
-
     @Override
     public void handleDelivery(String consumerTag,
-                               Envelope envelope,
-                               AMQP.BasicProperties properties,
-                               byte[] body) throws IOException {
+                             Envelope envelope,
+                             AMQP.BasicProperties properties,
+                             byte[] body) throws IOException {
 
         Message message = new Message();
 
@@ -99,7 +101,7 @@ public class ExperimentConsumer extends QueueingConsumer {
                 messageContext.setIsRedeliver(envelope.isRedeliver());
                 handler.onMessage(messageContext);
 
-            }else {
+            } else {
                 log.error("{} message type is not handle in ProcessLaunch 
Subscriber. Sending ack for " +
                         "delivery tag {} ", message.getMessageType().name(), 
deliveryTag);
                 sendAck(deliveryTag);
@@ -112,11 +114,11 @@ public class ExperimentConsumer extends QueueingConsumer {
     }
 
 
-    private void sendAck(long deliveryTag){
+    private void sendAck(long deliveryTag) {
         try {
-            if (channel.isOpen()){
-                channel.basicAck(deliveryTag,false);
-            }else {
+            if (channel.isOpen()) {
+                channel.basicAck(deliveryTag, false);
+            } else {
                 channel = connection.createChannel();
                 channel.basicQos(ServerSettings.getRabbitmqPrefetchCount());
                 channel.basicAck(deliveryTag, false);
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/MessageConsumer.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/MessageConsumer.java
index 306a41f9c2..c5b1d364b4 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/MessageConsumer.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/MessageConsumer.java
@@ -19,28 +19,29 @@
  */
 package org.apache.airavata.messaging.core.impl;
 
-import com.rabbitmq.client.*;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Envelope;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.model.dbevent.DBEventMessage;
-import org.apache.airavata.model.dbevent.DBEventMessageContext;
-import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent;
 import org.apache.airavata.model.messaging.event.Message;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TBase;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
+import com.rabbitmq.client.DefaultConsumer;
 
 import java.io.IOException;
 
-public class MessageConsumer extends QueueingConsumer {
+public class MessageConsumer extends DefaultConsumer {
 
     private static final Logger logger = 
LogManager.getLogger(MessageConsumer.class);
 
-    private MessageHandler handler;
+    private final MessageHandler handler;
     private Channel channel;
-    private Connection connection;
+    private final Connection connection;
 
     public MessageConsumer(MessageHandler messageHandler, Connection 
connection, Channel channel) {
         super(channel);
@@ -65,7 +66,7 @@ public class MessageConsumer extends QueueingConsumer {
             DBEventMessage dBEventMessage = new DBEventMessage();
             ThriftUtils.createThriftFromBytes(message.getEvent(), 
dBEventMessage);
 
-            MessageContext messageContext = new MessageContext((TBase) 
dBEventMessage, message.getMessageType(), message.getMessageId(), "gatewayId", 
envelope.getDeliveryTag());
+            MessageContext messageContext = new MessageContext(dBEventMessage, 
message.getMessageType(), message.getMessageId(), "gatewayId", 
envelope.getDeliveryTag());
             handler.onMessage(messageContext);
             //sendAck(deliveryTag);
 
@@ -76,12 +77,12 @@ public class MessageConsumer extends QueueingConsumer {
     }
 
 
-    private void sendAck(long deliveryTag){
+    private void sendAck(long deliveryTag) {
         logger.info("sendAck() -> Sending ack. Delivery Tag : " + deliveryTag);
         try {
-            if (channel.isOpen()){
-                channel.basicAck(deliveryTag,false);
-            }else {
+            if (channel.isOpen()) {
+                channel.basicAck(deliveryTag, false);
+            } else {
                 channel = connection.createChannel();
                 channel.basicQos(20);
                 channel.basicAck(deliveryTag, false);
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
index f6f1127e8c..c3b0dcd397 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
@@ -23,7 +23,6 @@ import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.QueueingConsumer;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
@@ -37,28 +36,29 @@ import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import com.rabbitmq.client.DefaultConsumer;
 
 import java.io.IOException;
 
-public class ProcessConsumer extends QueueingConsumer{
+public class ProcessConsumer extends DefaultConsumer {
     private static final Logger log = 
LoggerFactory.getLogger(ProcessConsumer.class);
 
-    private MessageHandler handler;
+    private final MessageHandler handler;
     private Channel channel;
-    private Connection connection;
+    private final Connection connection;
 
-    public ProcessConsumer(MessageHandler messageHandler, Connection 
connection, Channel channel){
+    public ProcessConsumer(MessageHandler messageHandler, Connection 
connection, Channel channel) {
         super(channel);
         this.handler = messageHandler;
         this.connection = connection;
         this.channel = channel;
     }
 
-
-    @Override public void handleDelivery(String consumerTag,
-                               Envelope envelope,
-                               AMQP.BasicProperties basicProperties,
-                               byte[] body) throws IOException {
+    @Override
+    public void handleDelivery(String consumerTag,
+                             Envelope envelope,
+                             AMQP.BasicProperties basicProperties,
+                             byte[] body) throws IOException {
 
         Message message = new Message();
 
@@ -105,11 +105,11 @@ public class ProcessConsumer extends QueueingConsumer{
 
     }
 
-    private void sendAck(long deliveryTag){
+    private void sendAck(long deliveryTag) {
         try {
-            if (channel.isOpen()){
-                channel.basicAck(deliveryTag,false);
-            }else {
+            if (channel.isOpen()) {
+                channel.basicAck(deliveryTag, false);
+            } else {
                 channel = connection.createChannel();
                 channel.basicQos(ServerSettings.getRabbitmqPrefetchCount());
                 channel.basicAck(deliveryTag, false);

Reply via email to