This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch cybershuttle-dev in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 3f7df10a72812ccad9191fad7db44874f308d6fb Author: yasith <[email protected]> AuthorDate: Thu Apr 24 01:46:02 2025 -0500 fix rabbitmq deprecation --- .../messaging/core/impl/ExperimentConsumer.java | 30 ++++++++++++---------- .../messaging/core/impl/MessageConsumer.java | 29 +++++++++++---------- .../messaging/core/impl/ProcessConsumer.java | 28 ++++++++++---------- 3 files changed, 45 insertions(+), 42 deletions(-) diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java index 16b75275ad..5025a5e7ca 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java @@ -23,26 +23,29 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.QueueingConsumer; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.MessageHandler; -import org.apache.airavata.model.messaging.event.*; +import org.apache.airavata.model.messaging.event.ExperimentIntermediateOutputsEvent; +import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent; +import org.apache.airavata.model.messaging.event.Message; +import org.apache.airavata.model.messaging.event.MessageType; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.rabbitmq.client.DefaultConsumer; import java.io.IOException; -public class ExperimentConsumer extends QueueingConsumer { +public class ExperimentConsumer extends DefaultConsumer { private static final Logger log = LoggerFactory.getLogger(ExperimentConsumer.class); - private MessageHandler handler; + private final MessageHandler handler; private Channel channel; - private Connection connection; + private final Connection connection; public ExperimentConsumer(MessageHandler messageHandler, Connection connection, Channel channel) { super(channel); @@ -51,12 +54,11 @@ public class ExperimentConsumer extends QueueingConsumer { this.channel = channel; } - @Override public void handleDelivery(String consumerTag, - Envelope envelope, - AMQP.BasicProperties properties, - byte[] body) throws IOException { + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) throws IOException { Message message = new Message(); @@ -99,7 +101,7 @@ public class ExperimentConsumer extends QueueingConsumer { messageContext.setIsRedeliver(envelope.isRedeliver()); handler.onMessage(messageContext); - }else { + } else { log.error("{} message type is not handle in ProcessLaunch Subscriber. Sending ack for " + "delivery tag {} ", message.getMessageType().name(), deliveryTag); sendAck(deliveryTag); @@ -112,11 +114,11 @@ public class ExperimentConsumer extends QueueingConsumer { } - private void sendAck(long deliveryTag){ + private void sendAck(long deliveryTag) { try { - if (channel.isOpen()){ - channel.basicAck(deliveryTag,false); - }else { + if (channel.isOpen()) { + channel.basicAck(deliveryTag, false); + } else { channel = connection.createChannel(); channel.basicQos(ServerSettings.getRabbitmqPrefetchCount()); channel.basicAck(deliveryTag, false); diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/MessageConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/MessageConsumer.java index 306a41f9c2..c5b1d364b4 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/MessageConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/MessageConsumer.java @@ -19,28 +19,29 @@ */ package org.apache.airavata.messaging.core.impl; -import com.rabbitmq.client.*; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Envelope; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.MessageHandler; import org.apache.airavata.model.dbevent.DBEventMessage; -import org.apache.airavata.model.dbevent.DBEventMessageContext; -import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent; import org.apache.airavata.model.messaging.event.Message; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.thrift.TBase; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import com.rabbitmq.client.DefaultConsumer; import java.io.IOException; -public class MessageConsumer extends QueueingConsumer { +public class MessageConsumer extends DefaultConsumer { private static final Logger logger = LogManager.getLogger(MessageConsumer.class); - private MessageHandler handler; + private final MessageHandler handler; private Channel channel; - private Connection connection; + private final Connection connection; public MessageConsumer(MessageHandler messageHandler, Connection connection, Channel channel) { super(channel); @@ -65,7 +66,7 @@ public class MessageConsumer extends QueueingConsumer { DBEventMessage dBEventMessage = new DBEventMessage(); ThriftUtils.createThriftFromBytes(message.getEvent(), dBEventMessage); - MessageContext messageContext = new MessageContext((TBase) dBEventMessage, message.getMessageType(), message.getMessageId(), "gatewayId", envelope.getDeliveryTag()); + MessageContext messageContext = new MessageContext(dBEventMessage, message.getMessageType(), message.getMessageId(), "gatewayId", envelope.getDeliveryTag()); handler.onMessage(messageContext); //sendAck(deliveryTag); @@ -76,12 +77,12 @@ public class MessageConsumer extends QueueingConsumer { } - private void sendAck(long deliveryTag){ + private void sendAck(long deliveryTag) { logger.info("sendAck() -> Sending ack. Delivery Tag : " + deliveryTag); try { - if (channel.isOpen()){ - channel.basicAck(deliveryTag,false); - }else { + if (channel.isOpen()) { + channel.basicAck(deliveryTag, false); + } else { channel = connection.createChannel(); channel.basicQos(20); channel.basicAck(deliveryTag, false); diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java index f6f1127e8c..c3b0dcd397 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java @@ -23,7 +23,6 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.QueueingConsumer; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; @@ -37,28 +36,29 @@ import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.rabbitmq.client.DefaultConsumer; import java.io.IOException; -public class ProcessConsumer extends QueueingConsumer{ +public class ProcessConsumer extends DefaultConsumer { private static final Logger log = LoggerFactory.getLogger(ProcessConsumer.class); - private MessageHandler handler; + private final MessageHandler handler; private Channel channel; - private Connection connection; + private final Connection connection; - public ProcessConsumer(MessageHandler messageHandler, Connection connection, Channel channel){ + public ProcessConsumer(MessageHandler messageHandler, Connection connection, Channel channel) { super(channel); this.handler = messageHandler; this.connection = connection; this.channel = channel; } - - @Override public void handleDelivery(String consumerTag, - Envelope envelope, - AMQP.BasicProperties basicProperties, - byte[] body) throws IOException { + @Override + public void handleDelivery(String consumerTag, + Envelope envelope, + AMQP.BasicProperties basicProperties, + byte[] body) throws IOException { Message message = new Message(); @@ -105,11 +105,11 @@ public class ProcessConsumer extends QueueingConsumer{ } - private void sendAck(long deliveryTag){ + private void sendAck(long deliveryTag) { try { - if (channel.isOpen()){ - channel.basicAck(deliveryTag,false); - }else { + if (channel.isOpen()) { + channel.basicAck(deliveryTag, false); + } else { channel = connection.createChannel(); channel.basicQos(ServerSettings.getRabbitmqPrefetchCount()); channel.basicAck(deliveryTag, false);
