This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch styles-and-rules in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 2b2e2e388f96402afb6657ebf604fbcde46936b6 Author: yasithdev <[email protected]> AuthorDate: Fri Jun 6 18:04:39 2025 -0500 remove non-source java files --- .../listener/ExperimentStatusChangedEvent.java | 83 ------ .../apache/airavata/common/utils/ServiceUtils.java | 112 -------- .../airavata/messaging/core/PublisherFactory.java | 88 ------ .../core/impl/RabbitMQProcessLaunchConsumer.java | 307 --------------------- .../core/impl/RabbitMQProcessLaunchPublisher.java | 97 ------- .../messaging/core/impl/RabbitMQProducer.java | 239 ---------------- .../core/impl/RabbitMQStatusPublisher.java | 125 --------- .../core/impl/RabbitMQStatusSubscriber.java | 306 -------------------- .../airavata/orchestrator/core/ValidatorTest.java | 131 --------- .../orchestrator/core/util/SecondValidator.java | 61 ---- .../orchestrator/core/util/TestValidator.java | 73 ----- 11 files changed, 1622 deletions(-) diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/ExperimentStatusChangedEvent.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/ExperimentStatusChangedEvent.java deleted file mode 100644 index 9845acd9ec..0000000000 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/ExperimentStatusChangedEvent.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -//*/ -//package org.apache.airavata.api.server.listener; -// -//import org.apache.airavata.common.utils.listener.AbstractStateChangeRequest; -//import org.apache.airavata.gfac.core.monitor.ExperimentIdentity; -//import org.apache.airavata.model.experiment.ExperimentModelState; -// -///** -// * This is the primary job state object used in -// * through out the monitor module. This use airavata-data-model JobState enum -// * Ideally after processing each event or monitoring message from remote system -// * Each monitoring implementation has to return this object with a state and -// * the monitoring ID -// */ -//public class ExperimentStatusChangedEvent extends AbstractStateChangeRequest { -// private ExperimentState state; -// private ExperimentIdentity identity; -// -// // this constructor can be used in Qstat monitor to handle errors -// public ExperimentStatusChangedEvent() { -// } -// -// public ExperimentStatusChangedEvent(ExperimentIdentity experimentIdentity, ExperimentState state) { -// this.state = state; -// setIdentity(experimentIdentity); -// } -// -// public ExperimentState getState() { -// return state; -// } -// -// public void setState(ExperimentState state) { -// this.state = state; -// } -// -// public ExperimentIdentity getIdentity() { -// return identity; -// } -// -// public void setIdentity(ExperimentIdentity identity) { -// this.identity = identity; -// } -// -// -//} diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServiceUtils.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServiceUtils.java deleted file mode 100644 index 77da47836e..0000000000 --- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServiceUtils.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -// */ -// -//package org.apache.airavata.common.utils; -// -//import java.io.IOException; -//import java.net.SocketException; -// -//import org.apache.airavata.common.exception.ApplicationSettingsException; -//import org.apache.axis2.context.ConfigurationContext; -//import org.apache.axis2.description.TransportInDescription; -//import org.apache.axis2.util.Utils; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//public class ServiceUtils { -// private static final Logger log = LoggerFactory.getLogger(ServiceUtils.class); -//// private static final String REPOSITORY_PROPERTIES = "airavata-server.properties"; -// public static final String IP = "ip"; -// public static final String PORT = "port"; -// -// public static String generateServiceURLFromConfigurationContext( -// ConfigurationContext context, String serviceName) throws IOException, ApplicationSettingsException { -//// URL url = ServiceUtils.class.getClassLoader() -//// .getResource(REPOSITORY_PROPERTIES); -// String localAddress = null; -// String port = null; -//// Properties properties = new Properties(); -// try { -// localAddress = ServerSettings.getSetting(IP); -// } catch (ApplicationSettingsException e) { -// //we will ignore this exception since the properties file will not contain the values -// //when it is ok to retrieve them from the axis2 context -// } -// if(localAddress == null){ -// try { -// localAddress = Utils.getIpAddress(context -// .getAxisConfiguration()); -// } catch (SocketException e) { -// e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. -// } -// } -// String protocol="http"; -// if(ServerSettings.isEnableHttps()){ -// protocol="https"; -// } -// -// try { -// port = ServerSettings.getTomcatPort(protocol); -// } catch (ApplicationSettingsException e) { -// //we will ignore this exception since the properties file will not contain the values -// //when it is ok to retrieve them from the axis2 context -// } -// if (port == null) { -// TransportInDescription transportInDescription = context -// .getAxisConfiguration().getTransportsIn() -// .get(protocol); -// if (transportInDescription != null -// && transportInDescription.getParameter(PORT) != null) { -// port = (String) transportInDescription -// .getParameter(PORT).getValue(); -// } -// } -// localAddress = protocol+"://" + localAddress + ":" + port; -// localAddress = localAddress + "/" -// //We are not using axis2 config context to get the context root because it is invalid -// //+ context.getContextRoot() + "/" -// //FIXME: the context root will be correct after updating the web.xml -// + ServerSettings.getServerContextRoot() + "/" -// + context.getServicePath() + "/" -// + serviceName; -// log.debug("Service Address Configured:" + localAddress); -// return localAddress; -// } -//} diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java deleted file mode 100644 index 2ab6e36e39..0000000000 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -// */ -// -//package org.apache.airavata.messaging.core; -// -//import org.apache.airavata.common.exception.AiravataException; -//import org.apache.airavata.common.utils.ServerSettings; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//public class PublisherFactory { -// private static Logger log = LoggerFactory.getLogger(PublisherFactory.class); -// -// public static Publisher createActivityPublisher() throws AiravataException { -// String activityPublisher = ServerSettings.getStatusPublisher(); -// -// if (activityPublisher == null) { -// String s = "Activity publisher is not specified"; -// log.error(s); -// throw new AiravataException(s); -// } -// -// try { -// Class<? extends Publisher> aPublisher = Class.forName(activityPublisher).asSubclass(Publisher.class); -// return aPublisher.newInstance(); -// } catch (Exception e) { -// String msg = "Failed to load the publisher from the publisher class property: " + activityPublisher; -// log.error(msg, e); -// throw new AiravataException(msg, e); -// } -// } -// -// public static Publisher createTaskLaunchPublisher() throws AiravataException { -// String taskLaunchPublisher = ServerSettings.getTaskLaunchPublisher(); -// -// if (taskLaunchPublisher == null) { -// String s = "Task launch publisher is not specified"; -// log.error(s); -// throw new AiravataException(s); -// } -// -// try { -// Class<? extends Publisher> aPublisher = Class.forName(taskLaunchPublisher).asSubclass(Publisher.class); -// return aPublisher.newInstance(); -// } catch (Exception e) { -// String msg = "Failed to load the publisher from the publisher class property: " + taskLaunchPublisher; -// log.error(msg, e); -// throw new AiravataException(msg, e); -// } -// } -//} diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java deleted file mode 100644 index 9bb26aeb48..0000000000 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java +++ /dev/null @@ -1,307 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -//*/ -//package org.apache.airavata.messaging.core.impl; -// -//import com.rabbitmq.client.*; -//import org.apache.airavata.common.exception.AiravataException; -//import org.apache.airavata.common.exception.ApplicationSettingsException; -//import org.apache.airavata.common.utils.AiravataUtils; -//import org.apache.airavata.common.utils.ServerSettings; -//import org.apache.airavata.common.utils.ThriftUtils; -//import org.apache.airavata.messaging.core.MessageContext; -//import org.apache.airavata.messaging.core.MessageHandler; -//import org.apache.airavata.messaging.core.MessagingConstants; -//import org.apache.airavata.model.messaging.event.*; -//import org.apache.thrift.TBase; -//import org.apache.thrift.TException; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import java.io.IOException; -//import java.util.ArrayList; -//import java.util.HashMap; -//import java.util.List; -//import java.util.Map; -// -//public class RabbitMQProcessLaunchConsumer { -// private final static Logger logger = LoggerFactory.getLogger(RabbitMQProcessLaunchConsumer.class); -// private static Logger log = LoggerFactory.getLogger(RabbitMQStatusSubscriber.class); -// -// private String taskLaunchExchangeName; -// private String url; -// private Connection connection; -// private Channel channel; -// private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>(); -// private boolean durableQueue; -// private MessageHandler messageHandler; -// private int prefetchCount; -// -// -// public RabbitMQProcessLaunchConsumer() throws AiravataException { -// try { -// url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); -// durableQueue = Boolean.parseBoolean(ServerSettings.getSetting(MessagingConstants.DURABLE_QUEUE)); -// taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME); -// prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64))); -// createConnection(); -// } catch (ApplicationSettingsException e) { -// String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; -// log.error(message, e); -// throw new AiravataException(message, e); -// } -// } -// -// public RabbitMQProcessLaunchConsumer(String brokerUrl, String exchangeName) throws AiravataException { -// this.taskLaunchExchangeName = exchangeName; -// this.url = brokerUrl; -// -// createConnection(); -// } -// -// private void createConnection() throws AiravataException { -// try { -// ConnectionFactory connectionFactory = new ConnectionFactory(); -// connectionFactory.setUri(url); -// connectionFactory.setAutomaticRecoveryEnabled(true); -// connection = connectionFactory.newConnection(); -// connection.addShutdownListener(new ShutdownListener() { -// public void shutdownCompleted(ShutdownSignalException cause) { -// } -// }); -// log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName); -// -// channel = connection.createChannel(); -// channel.basicQos(prefetchCount); -// -//// channel.exchangeDeclare(taskLaunchExchangeName, "fanout"); -// -// } catch (Exception e) { -// String msg = "could not open channel for exchange " + taskLaunchExchangeName; -// log.error(msg); -// throw new AiravataException(msg, e); -// } -// } -// -// public void reconnect() throws AiravataException{ -// if(messageHandler!=null) { -// try { -// listen(messageHandler); -// } catch (AiravataException e) { -// String msg = "could not open channel for exchange " + taskLaunchExchangeName; -// log.error(msg); -// throw new AiravataException(msg, e); -// -// } -// } -// } -// public String listen(final MessageHandler handler) throws AiravataException { -// try { -// messageHandler = handler; -// Map<String, Object> props = handler.getProperties(); -// final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY); -// if (routing == null) { -// throw new IllegalArgumentException("The routing key must be present"); -// } -// List<String> keys = new ArrayList<String>(); -// if (routing instanceof List) { -// for (Object o : (List)routing) { -// keys.add(o.toString()); -// } -// } else if (routing instanceof String) { -// keys.add((String) routing); -// } -// -// String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE); -// String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG); -// if (queueName == null) { -// if (!channel.isOpen()) { -// channel = connection.createChannel(); -// channel.basicQos(prefetchCount); -//// channel.exchangeDeclare(taskLaunchExchangeName, "fanout"); -// } -// queueName = channel.queueDeclare().getQueue(); -// } else { -// -// channel.queueDeclare(queueName, durableQueue, false, false, null); -// } -// -// final String id = getId(keys, queueName); -// if (queueDetailsMap.containsKey(id)) { -// throw new IllegalStateException("This subscriber is already defined for this Subscriber, " + -// "cannot define the same subscriber twice"); -// } -// -// if (consumerTag == null) { -// consumerTag = "default"; -// } -// -// // bind all the routing keys -//// for (String routingKey : keys) { -//// channel.queueBind(queueName, taskLaunchExchangeName, routingKey); -//// } -// // autoAck=false, we will ack after task is done -// channel.basicConsume(queueName, false, consumerTag, new QueueingConsumer(channel) { -// @Override -// public void handleDelivery(String consumerTag, -// Envelope envelope, -// AMQP.BasicProperties properties, -// byte[] body) { -// Message message = new Message(); -// -// try { -// ThriftUtils.createThriftFromBytes(body, message); -// TBase event = null; -// String gatewayId = null; -// long deliveryTag = envelope.getDeliveryTag(); -// if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) { -// ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent(); -// ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent); -// log.debug(" Message Received with message id '" + message.getMessageId() -// + "' and with message type '" + message.getMessageType() + "' for experimentId:" + -// " " + -// processSubmitEvent.getProcessId()); -// event = processSubmitEvent; -// gatewayId = processSubmitEvent.getGatewayId(); -// MessageContext messageContext = new MessageContext(event, message.getMessageType(), -// message.getMessageId(), gatewayId, deliveryTag); -// messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime())); -// messageContext.setIsRedeliver(envelope.isRedeliver()); -// handler.onMessage(messageContext); -// } else { -// log.error("{} message type is not handle in ProcessLaunch Subscriber. Sending ack for " + -// "delivery tag {} ", message.getMessageType().name(), deliveryTag); -// sendAck(deliveryTag); -// } -// } catch (TException e) { -// String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id; -// log.warn(msg, e); -// } -// } -// -// @Override -// public void handleCancel(String consumerTag) throws IOException { -// super.handleCancel(consumerTag); -// log.info("Subscriber cancelled : " + consumerTag); -// } -// }); -// -// // save the name for deleting the queue -// queueDetailsMap.put(id, new QueueDetails(queueName, keys)); -// return id; -// } catch (Exception e) { -// String msg = "could not open channel for exchange " + taskLaunchExchangeName; -// log.error(msg); -// throw new AiravataException(msg, e); -// } -// } -// -// public void stopListen(final String id) throws AiravataException { -// QueueDetails details = queueDetailsMap.get(id); -// if (details != null) { -// try { -// for (String key : details.getRoutingKeys()) { -// channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key); -// } -// } catch (IOException e) { -// String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName; -// log.debug(msg); -// } -// } -// } -// -// /** -// * Private class for holding some information about the consumers registered -// */ -// private class QueueDetails { -// String queueName; -// -// List<String> routingKeys; -// -// private QueueDetails(String queueName, List<String> routingKeys) { -// this.queueName = queueName; -// this.routingKeys = routingKeys; -// } -// -// public String getQueueName() { -// return queueName; -// } -// -// public List<String> getRoutingKeys() { -// return routingKeys; -// } -// } -// -// private String getId(List<String> routingKeys, String queueName) { -// String id = ""; -// for (String key : routingKeys) { -// id = id + "_" + key; -// } -// return id + "_" + queueName; -// } -// -// public void close() { -// if (connection != null) { -// try { -// connection.close(); -// } catch (IOException ignore) { -// } -// } -// } -// public boolean isOpen(){ -// if(connection!=null){ -// return connection.isOpen(); -// } -// return false; -// } -// -// public void sendAck(long deliveryTag){ -// try { -// if (channel.isOpen()){ -// channel.basicAck(deliveryTag,false); -// }else { -// channel = connection.createChannel(); -// channel.basicQos(prefetchCount); -// channel.basicAck(deliveryTag, false); -// } -// } catch (IOException e) { -// logger.error(e.getMessage(), e); -// } -// } -//} diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java deleted file mode 100644 index fbcb54e1a5..0000000000 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -//*/ -//package org.apache.airavata.messaging.core.impl; -// -//import org.apache.airavata.common.exception.AiravataException; -//import org.apache.airavata.common.exception.ApplicationSettingsException; -//import org.apache.airavata.common.utils.ServerSettings; -//import org.apache.airavata.common.utils.ThriftUtils; -//import org.apache.airavata.messaging.core.MessageContext; -//import org.apache.airavata.messaging.core.MessagingConstants; -//import org.apache.airavata.messaging.core.Publisher; -//import org.apache.airavata.model.messaging.event.*; -//import org.apache.thrift.TException; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//public class RabbitMQProcessLaunchPublisher implements Publisher{ -// private final static Logger log = LoggerFactory.getLogger(RabbitMQProcessLaunchPublisher.class); -// private String launchTask; -// -// private RabbitMQProducer rabbitMQProducer; -// -// public RabbitMQProcessLaunchPublisher() throws Exception { -// String brokerUrl; -// try { -// brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); -// launchTask = ServerSettings.getRabbitmqProcessLaunchQueueName(); -// } catch (ApplicationSettingsException e) { -// String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; -// log.error(message, e); -// throw new AiravataException(message, e); -// } -// rabbitMQProducer = new RabbitMQProducer(brokerUrl, null,null); -// rabbitMQProducer.open(); -// } -// -// public void publish(MessageContext msgCtx) throws AiravataException { -// try { -// log.info("Publishing to launch queue ..."); -// byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); -// Message message = new Message(); -// message.setEvent(body); -// message.setMessageId(msgCtx.getMessageId()); -// message.setMessageType(msgCtx.getType()); -// message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); -// String routingKey = launchTask; -// byte[] messageBody = ThriftUtils.serializeThriftObject(message); -// rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey); -// log.info("Successfully published to launch queue ..."); -// } catch (TException e) { -// String msg = "Error while deserializing the object"; -// log.error(msg, e); -// throw new AiravataException(msg, e); -// } catch (Exception e) { -// String msg = "Error while sending to rabbitmq"; -// log.error(msg, e); -// throw new AiravataException(msg, e); -// } -// } -//} diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java deleted file mode 100644 index 4e0c766141..0000000000 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java +++ /dev/null @@ -1,239 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -// */ -// -//package org.apache.airavata.messaging.core.impl; -// -//import com.rabbitmq.client.*; -//import org.apache.airavata.common.exception.AiravataException; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import java.io.IOException; -// -//public class RabbitMQProducer { -// public static final int DEFAULT_PRE_FETCH = 64; -// -// private static Logger log = LoggerFactory.getLogger(RabbitMQProducer.class); -// -// private Connection connection; -// -// private Channel channel; -// -// private QueueingConsumer consumer; -// -// private String consumerTag; -// -// private String exchangeName; -// -// private int prefetchCount = DEFAULT_PRE_FETCH; -// -// private boolean isReQueueOnFail = false; -// -// private String url; -// -// private String getExchangeType = "topic"; -// -// -// public RabbitMQProducer(String url, String exchangeName,String getExchangeType) { -// this.exchangeName = exchangeName; -// this.url = url; -// this.getExchangeType = getExchangeType; -// } -// -// public RabbitMQProducer(String url, String exchangeName) { -// this.exchangeName = exchangeName; -// this.url = url; -// } -// -// public void setPrefetchCount(int prefetchCount) { -// this.prefetchCount = prefetchCount; -// } -// -// public void setReQueueOnFail(boolean isReQueueOnFail) { -// this.isReQueueOnFail = isReQueueOnFail; -// } -// -// private void reset() { -// consumerTag = null; -// } -// -// private void reInitIfNecessary() throws Exception { -// if (consumerTag == null || consumer == null) { -// close(); -// open(); -// } -// } -// -// public void close() { -// log.info("Closing channel to exchange {}", exchangeName); -// try { -// if (channel != null && channel.isOpen()) { -// if (consumerTag != null) { -// channel.basicCancel(consumerTag); -// } -// channel.close(); -// } -// } catch (Exception e) { -// log.debug("error closing channel and/or cancelling consumer", e); -// } -// try { -// log.info("closing connection to rabbitmq: " + connection); -// connection.close(); -// } catch (Exception e) { -// log.debug("error closing connection", e); -// } -// consumer = null; -// consumerTag = null; -// channel = null; -// connection = null; -// } -// -// public void open() throws AiravataException { -// try { -// connection = createConnection(); -// channel = connection.createChannel(); -// if (prefetchCount > 0) { -// log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName); -// channel.basicQos(prefetchCount); -// } -// if(exchangeName!=null) { -// channel.exchangeDeclare(exchangeName, getExchangeType, false); -// } -// } catch (Exception e) { -// reset(); -// String msg = "could not open channel for exchange " + exchangeName; -// log.error(msg); -// throw new AiravataException(msg, e); -// } -// } -// -// public void send(byte []message, String routingKey) throws Exception { -// try { -// channel.basicPublish(exchangeName, routingKey, null, message); -// } catch (IOException e) { -// String msg = "Failed to publish message to exchange: " + exchangeName; -// log.error(msg, e); -// throw new Exception(msg, e); -// } -// } -// -// public void sendToWorkerQueue(byte []message, String routingKey) throws Exception { -// try { -// channel.basicPublish( "", routingKey, -// MessageProperties.PERSISTENT_TEXT_PLAIN, -// message); -// } catch (IOException e) { -// String msg = "Failed to publish message to exchange: " + exchangeName; -// log.error(msg, e); -// throw new Exception(msg, e); -// } -// } -// -// private Connection createConnection() throws IOException { -// try { -// ConnectionFactory connectionFactory = new ConnectionFactory(); -// connectionFactory.setUri(url); -// connectionFactory.setAutomaticRecoveryEnabled(true); -// Connection connection = connectionFactory.newConnection(); -// connection.addShutdownListener(new ShutdownListener() { -// public void shutdownCompleted(ShutdownSignalException cause) { -// } -// }); -// log.info("connected to rabbitmq: " + connection + " for " + exchangeName); -// return connection; -// } catch (Exception e) { -// log.info("connection failed to rabbitmq: " + connection + " for " + exchangeName); -// return null; -// } -// } -// -// public void ackMessage(Long msgId) throws Exception { -// try { -// channel.basicAck(msgId, false); -// } catch (ShutdownSignalException sse) { -// reset(); -// String msg = "shutdown signal received while attempting to ack message"; -// log.error(msg, sse); -// throw new Exception(msg, sse); -// } catch (Exception e) { -// String s = "could not ack for msgId: " + msgId; -// log.error(s, e); -// throw new Exception(s, e); -// } -// } -// -// public void failMessage(Long msgId) throws Exception { -// if (isReQueueOnFail) { -// failWithRedelivery(msgId); -// } else { -// deadLetter(msgId); -// } -// } -// -// public void failWithRedelivery(Long msgId) throws Exception { -// try { -// channel.basicReject(msgId, true); -// } catch (ShutdownSignalException sse) { -// reset(); -// String msg = "shutdown signal received while attempting to fail with redelivery"; -// log.error(msg, sse); -// throw new Exception(msg, sse); -// } catch (Exception e) { -// String msg = "could not fail with redelivery for msgId: " + msgId; -// log.error(msg, e); -// throw new Exception(msg, e); -// } -// } -// -// public void deadLetter(Long msgId) throws Exception { -// try { -// channel.basicReject(msgId, false); -// } catch (ShutdownSignalException sse) { -// reset(); -// String msg = "shutdown signal received while attempting to fail with no redelivery"; -// log.error(msg, sse); -// throw new Exception(msg, sse); -// } catch (Exception e) { -// String msg = "could not fail with dead-lettering (when configured) for msgId: " + msgId; -// log.error(msg, e); -// throw new Exception(msg, e); -// } -// } -//} diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java deleted file mode 100644 index 80e80f9920..0000000000 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -// */ -// -//package org.apache.airavata.messaging.core.impl; -// -//import org.apache.airavata.common.exception.AiravataException; -//import org.apache.airavata.common.exception.ApplicationSettingsException; -//import org.apache.airavata.common.utils.ServerSettings; -//import org.apache.airavata.common.utils.ThriftUtils; -//import org.apache.airavata.messaging.core.MessageContext; -//import org.apache.airavata.messaging.core.MessagingConstants; -//import org.apache.airavata.messaging.core.Publisher; -//import org.apache.airavata.model.messaging.event.*; -//import org.apache.thrift.TException; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//public class RabbitMQStatusPublisher implements Publisher { -// -// private static Logger log = LoggerFactory.getLogger(RabbitMQStatusPublisher.class); -// -// private RabbitMQProducer rabbitMQProducer; -// -//// StatCounter statCounter = StatCounter.getInstance(); -// -// public RabbitMQStatusPublisher() throws AiravataException { -// String brokerUrl; -// String exchangeName; -// try { -// brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); -// exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME); -// } catch (ApplicationSettingsException e) { -// String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; -// log.error(message, e); -// throw new AiravataException(message, e); -// } -// rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName); -// rabbitMQProducer.open(); -// } -// -// public void publish(MessageContext msgCtx) throws AiravataException { -// try { -// log.info("Publishing status to rabbitmq..."); -// byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); -// Message message = new Message(); -// message.setEvent(body); -// message.setMessageId(msgCtx.getMessageId()); -// message.setMessageType(msgCtx.getType()); -// message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); -// String gatewayId = msgCtx.getGatewayId(); -// String routingKey = null; -// if (msgCtx.getType() == MessageType.EXPERIMENT) { -// ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent(); -// routingKey = gatewayId + "." + event.getExperimentId(); -// } else if (msgCtx.getType() == MessageType.TASK) { -// TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); -// routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." + -// event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId(); -// } else if (msgCtx.getType() == MessageType.PROCESSOUTPUT) { -// TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent(); -// routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." + -// event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId(); -// } else if (msgCtx.getType() == MessageType.PROCESS) { -// ProcessStatusChangeEvent event = (ProcessStatusChangeEvent) msgCtx.getEvent(); -// ProcessIdentifier processIdentifier = event.getProcessIdentity(); -// routingKey = gatewayId + "." + processIdentifier.getExperimentId() + "." + processIdentifier.getProcessId(); -// } else if (msgCtx.getType() == MessageType.JOB) { -// JobStatusChangeEvent event = (JobStatusChangeEvent) msgCtx.getEvent(); -// JobIdentifier identity = event.getJobIdentity(); -// routingKey = gatewayId + "." + identity.getExperimentId() + "." + -// identity.getProcessId() + "." + -// identity.getTaskId() + "." + -// identity.getJobId(); -// } -// byte[] messageBody = ThriftUtils.serializeThriftObject(message); -// rabbitMQProducer.send(messageBody, routingKey); -//// statCounter.add(message); -// } catch (TException e) { -// String msg = "Error while deserializing the object"; -// log.error(msg, e); -// throw new AiravataException(msg, e); -// } catch (Exception e) { -// String msg = "Error while sending to rabbitmq"; -// log.error(msg, e); -// throw new AiravataException(msg, e); -// } -// } -//} diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusSubscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusSubscriber.java deleted file mode 100644 index ef8b3f636d..0000000000 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusSubscriber.java +++ /dev/null @@ -1,306 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -// */ -// -//package org.apache.airavata.messaging.core.impl; -// -// -//import com.rabbitmq.client.*; -//import org.apache.airavata.common.exception.AiravataException; -//import org.apache.airavata.common.exception.ApplicationSettingsException; -//import org.apache.airavata.common.utils.AiravataUtils; -//import org.apache.airavata.common.utils.ServerSettings; -//import org.apache.airavata.common.utils.ThriftUtils; -//import org.apache.airavata.messaging.core.Subscriber; -//import org.apache.airavata.messaging.core.MessageContext; -//import org.apache.airavata.messaging.core.MessageHandler; -//import org.apache.airavata.messaging.core.MessagingConstants; -//import org.apache.airavata.model.messaging.event.*; -//import org.apache.thrift.TBase; -//import org.apache.thrift.TException; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import jakarta.annotation.Nonnull; -//import java.io.IOException; -//import java.util.ArrayList; -//import java.util.HashMap; -//import java.util.List; -//import java.util.Map; -// -//public class RabbitMQStatusSubscriber implements Subscriber { -// public static final String EXCHANGE_TYPE = "topic"; -// private static Logger log = LoggerFactory.getLogger(RabbitMQStatusSubscriber.class); -// -// private String exchangeName; -// private String url; -// private Connection connection; -// private Channel channel; -// private int prefetchCount; -// private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>(); -// -// public RabbitMQStatusSubscriber() throws AiravataException { -// try { -// url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); -// exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME); -// prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64))); -// createConnection(); -// } catch (ApplicationSettingsException e) { -// String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; -// log.error(message, e); -// throw new AiravataException(message, e); -// } -// } -// -// public RabbitMQStatusSubscriber(String brokerUrl, String exchangeName) throws AiravataException { -// this.exchangeName = exchangeName; -// this.url = brokerUrl; -// -// createConnection(); -// } -// -// private void createConnection() throws AiravataException { -// try { -// ConnectionFactory connectionFactory = new ConnectionFactory(); -// connectionFactory.setUri(url); -// connectionFactory.setAutomaticRecoveryEnabled(true); -// connection = connectionFactory.newConnection(); -// connection.addShutdownListener(new ShutdownListener() { -// public void shutdownCompleted(ShutdownSignalException cause) { -// } -// }); -// log.info("connected to rabbitmq: " + connection + " for " + exchangeName); -// -// channel = connection.createChannel(); -// channel.basicQos(prefetchCount); -// channel.exchangeDeclare(exchangeName, EXCHANGE_TYPE, false); -// -// } catch (Exception e) { -// String msg = "could not open channel for exchange " + exchangeName; -// log.error(msg); -// throw new AiravataException(msg, e); -// } -// } -// -// public String listen(final MessageHandler handler) throws AiravataException { -// try { -// Map<String, Object> props = handler.getProperties(); -// final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY); -// if (routing == null) { -// throw new IllegalArgumentException("The routing key must be present"); -// } -// -// List<String> keys = new ArrayList<String>(); -// if (routing instanceof List) { -// for (Object o : (List)routing) { -// keys.add(o.toString()); -// } -// } else if (routing instanceof String) { -// keys.add((String) routing); -// } -// -// String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE); -// String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG); -// if (queueName == null) { -// if (!channel.isOpen()) { -// channel = connection.createChannel(); -// channel.exchangeDeclare(exchangeName, "topic", false); -// } -// queueName = channel.queueDeclare().getQueue(); -// } else { -// channel.queueDeclare(queueName, true, false, false, null); -// } -// -// final String id = getId(keys, queueName); -// if (queueDetailsMap.containsKey(id)) { -// throw new IllegalStateException("This subscriber is already defined for this Subscriber, " + -// "cannot define the same subscriber twice"); -// } -// -// if (consumerTag == null) { -// consumerTag = "default"; -// } -// -// // bind all the routing keys -// for (String routingKey : keys) { -// channel.queueBind(queueName, exchangeName, routingKey); -// } -// -// channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) { -// @Override -// public void handleDelivery(String consumerTag, -// Envelope envelope, -// AMQP.BasicProperties properties, -// byte[] body) { -// Message message = new Message(); -// -// try { -// ThriftUtils.createThriftFromBytes(body, message); -// TBase event = null; -// String gatewayId = null; -// -// if (message.getMessageType().equals(MessageType.EXPERIMENT)) { -// ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(); -// ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent); -// log.debug(" Message Received with message id '" + message.getMessageId() -// + "' and with message type '" + message.getMessageType() + "' with status " + -// experimentStatusChangeEvent.getState()); -// event = experimentStatusChangeEvent; -// gatewayId = experimentStatusChangeEvent.getGatewayId(); -// } else if (message.getMessageType().equals(MessageType.PROCESS)) { -// ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(); -// ThriftUtils.createThriftFromBytes(message.getEvent(), processStatusChangeEvent); -// log.debug("Message Recieved with message id :" + message.getMessageId() + " and with " + -// "message type " + message.getMessageType() + " with status " + -// processStatusChangeEvent.getState()); -// event = processStatusChangeEvent; -// gatewayId = processStatusChangeEvent.getProcessIdentity().getGatewayId(); -// } else if (message.getMessageType().equals(MessageType.TASK)) { -// TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(); -// ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent); -// log.debug(" Message Received with message id '" + message.getMessageId() -// + "' and with message type '" + message.getMessageType() + "' with status " + -// taskStatusChangeEvent.getState()); -// event = taskStatusChangeEvent; -// gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId(); -// }else if (message.getMessageType() == MessageType.PROCESSOUTPUT) { -// TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent(); -// ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent); -// log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType()); -// event = taskOutputChangeEvent; -// gatewayId = taskOutputChangeEvent.getTaskIdentity().getGatewayId(); -// } else if (message.getMessageType().equals(MessageType.JOB)) { -// JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(); -// ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent); -// log.debug(" Message Received with message id '" + message.getMessageId() -// + "' and with message type '" + message.getMessageType() + "' with status " + -// jobStatusChangeEvent.getState()); -// event = jobStatusChangeEvent; -// gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId(); -// } else if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) { -// TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(); -// ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent); -// log.debug(" Message Received with message id '" + message.getMessageId() -// + "' and with message type '" + message.getMessageType() + "' for experimentId: " + -// taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId()); -// event = taskSubmitEvent; -// gatewayId = taskSubmitEvent.getGatewayId(); -// } else if (message.getMessageType().equals(MessageType.TERMINATEPROCESS)) { -// TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent(); -// ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent); -// log.debug(" Message Received with message id '" + message.getMessageId() -// + "' and with message type '" + message.getMessageType() + "' for experimentId: " + -// taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId()); -// event = taskTerminateEvent; -// gatewayId = null; -// } -// MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId); -// messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime())); -// messageContext.setIsRedeliver(envelope.isRedeliver()); -// handler.onMessage(messageContext); -// } catch (TException e) { -// String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id; -// log.warn(msg, e); -// } -// } -// }); -// // save the name for deleting the queue -// queueDetailsMap.put(id, new QueueDetails(queueName, keys)); -// return id; -// } catch (Exception e) { -// String msg = "could not open channel for exchange " + exchangeName; -// log.error(msg); -// throw new AiravataException(msg, e); -// } -// } -// -// public void stopListen(final String id) throws AiravataException { -// QueueDetails details = queueDetailsMap.get(id); -// if (details != null) { -// try { -// for (String key : details.getRoutingKeys()) { -// channel.queueUnbind(details.getQueueName(), exchangeName, key); -// } -// channel.queueDelete(details.getQueueName(), true, true); -// } catch (IOException e) { -// String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName; -// log.debug(msg); -// } -// } -// } -// -// /** -// * Private class for holding some information about the consumers registered -// */ -// private class QueueDetails { -// String queueName; -// -// List<String> routingKeys; -// -// private QueueDetails(String queueName, List<String> routingKeys) { -// this.queueName = queueName; -// this.routingKeys = routingKeys; -// } -// -// public String getQueueName() { -// return queueName; -// } -// -// public List<String> getRoutingKeys() { -// return routingKeys; -// } -// } -// -// private String getId(List<String> routingKeys, String queueName) { -// String id = ""; -// for (String key : routingKeys) { -// id = id + "_" + key; -// } -// return id + "_" + queueName; -// } -// -// public void close() { -// if (connection != null) { -// try { -// connection.close(); -// } catch (IOException ignore) { -// } -// } -// } -//} diff --git a/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/ValidatorTest.java b/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/ValidatorTest.java deleted file mode 100644 index 6bb9c61c59..0000000000 --- a/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/ValidatorTest.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -//*/ -//package org.apache.airavata.orchestrator.core; -// -//import org.apache.airavata.common.utils.AiravataUtils; -//import org.apache.airavata.model.error.LaunchValidationException; -//import org.apache.airavata.model.util.ExperimentModelUtil; -//import org.apache.airavata.model.experiment.*; -//import org.apache.airavata.orchestrator.core.utils.OrchestratorConstants; -//import org.apache.airavata.orchestrator.cpi.Orchestrator; -//import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl; -//import org.apache.airavata.registry.core.experiment.registry.jpa.impl.RegistryFactory; -//import org.apache.airavata.registry.cpi.ParentDataType; -//import org.apache.airavata.registry.cpi.Registry; -//import org.junit.Assert; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -//import org.testng.annotations.BeforeTest; -//import org.testng.annotations.Test; -// -//import java.util.ArrayList; -//import java.util.List; -// -//public class ValidatorTest extends BaseOrchestratorTest { -// private static final Logger log = LoggerFactory.getLogger(NewOrchestratorTest.class); -// -// private Orchestrator orchestrator; -// private List<TaskDetails> tasks; -// -// @BeforeTest -// public void setUp() throws Exception { -// AiravataUtils.setExecutionAsServer(); -// super.setUp(); -// System.setProperty(OrchestratorConstants.JOB_VALIDATOR,"org.apache.airavata.orchestrator.core.util.TestValidator,org.apache.airavata.orchestrator.core.util.SecondValidator"); -// System.setProperty("enable.validation", "true"); -// orchestrator = new SimpleOrchestratorImpl(); -// } -// -// -// -// @Test -// public void testValidator() throws Exception { -// // creating host description -// List<DataObjectType> exInputs = new ArrayList<DataObjectType>(); -// DataObjectType input = new DataObjectType(); -// input.setKey("echo_input"); -// input.setType(DataType.STRING); -// input.setValue("echo_output=Hello World"); -// exInputs.add(input); -// -// List<DataObjectType> exOut = new ArrayList<DataObjectType>(); -// DataObjectType output = new DataObjectType(); -// output.setKey("echo_output"); -// output.setType(DataType.STRING); -// output.setValue(""); -// exOut.add(output); -// -// Experiment simpleExperiment = -// ExperimentModelUtil.createSimpleExperiment("default", "admin", "echoExperiment", "SimpleEcho0", "SimpleEcho0", exInputs); -// simpleExperiment.setExperimentOutputs(exOut); -// -// WorkflowNodeDetails test = ExperimentModelUtil.createWorkflowNode("test", null); -// ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling("localhost", 1, 1, 1, "normal", 0, 0, 1, "sds128"); -// scheduling.setResourceHostId("localhost"); -// UserConfigurationData userConfigurationData = new UserConfigurationData(); -// userConfigurationData.setAiravataAutoSchedule(false); -// userConfigurationData.setOverrideManualScheduledParams(false); -// userConfigurationData.setComputationalResourceScheduling(scheduling); -// simpleExperiment.setUserConfigurationData(userConfigurationData); -// -// Registry defaultRegistry = RegistryFactory.getDefaultExpCatalog(); -// String experimentId = (String)defaultRegistry.add(ParentDataType.EXPERIMENT, simpleExperiment); -// -// simpleExperiment.setExperimentID(experimentId); -// tasks = orchestrator.createTasks(experimentId); -// -// Assert.assertTrue(orchestrator.validateExperiment(simpleExperiment, test, tasks.get(0)).isValidationState()); -// -// simpleExperiment.setExperimentID(null); -// -// try { -// orchestrator.validateExperiment(simpleExperiment, test, tasks.get(0)).isValidationState(); -// }catch(LaunchValidationException e){ -// Assert.assertTrue(true); -// } -// tasks.get(0).setTaskID(null); -// try { -// orchestrator.validateExperiment(simpleExperiment, test, tasks.get(0)); -// }catch (LaunchValidationException e){ -// Assert.assertTrue(true); -// } -// } -// -//} diff --git a/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/util/SecondValidator.java b/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/util/SecondValidator.java deleted file mode 100644 index 2bcd0fd303..0000000000 --- a/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/util/SecondValidator.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -//*/ -//package org.apache.airavata.orchestrator.core.util; -// -//import org.apache.airavata.model.error.ValidationResults; -//import org.apache.airavata.model.error.ValidatorResult; -//import org.apache.airavata.model.experiment.ExperimentModel; -//import org.apache.airavata.model.experiment.TaskDetails; -//import org.apache.airavata.model.experiment.WorkflowNodeDetails; -//import org.apache.airavata.orchestrator.core.validator.JobMetadataValidator; -// -//public class SecondValidator implements JobMetadataValidator { -// public ValidationResults validate(Experiment experiment, WorkflowNodeDetails workflowNodeDetail, TaskDetails taskID) { -// ValidationResults validationResults = new ValidationResults(); -// validationResults.setValidationState(true); -// if(taskID.getTaskID() == null) { -// ValidatorResult validatorResult = new ValidatorResult(false); -// validatorResult.setErrorDetails("No taskID is set, so Validation failed"); -// validationResults.addToValidationResultList(validatorResult); -// validationResults.setValidationState(false); -// } -// return validationResults; -// } -//} diff --git a/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/util/TestValidator.java b/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/util/TestValidator.java deleted file mode 100644 index e6fe52eed2..0000000000 --- a/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/util/TestValidator.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -//*/ -//package org.apache.airavata.orchestrator.core.util; -// -//import org.apache.airavata.model.error.ValidationResults; -//import org.apache.airavata.model.error.ValidatorResult; -//import org.apache.airavata.model.experiment.ExperimentModel; -//import org.apache.airavata.model.experiment.TaskDetails; -//import org.apache.airavata.model.experiment.WorkflowNodeDetails; -//import org.apache.airavata.orchestrator.core.exception.OrchestratorException; -//import org.apache.airavata.orchestrator.core.validator.JobMetadataValidator; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//public class TestValidator implements JobMetadataValidator { -// private final static Logger logger = LoggerFactory.getLogger(TestValidator.class); -// -// public ValidationResults validate(Experiment experiment, WorkflowNodeDetails workflowNodeDetail, TaskDetails taskID) { -// ValidationResults validationResults = new ValidationResults(); -// validationResults.setValidationState(true); -// if (experiment.getProjectID() == null) { -// logger.error("Project ID is not set"); -// ValidatorResult validatorResult = new ValidatorResult(false); -// validatorResult.setErrorDetails("Project ID is not set"); -// validationResults.addToValidationResultList(validatorResult); -// validationResults.setValidationState(false); -// } else if (experiment.getExperimentID() == null) { -// logger.error("This experiment is wrong, no experimentID set"); -// ValidatorResult validatorResult = new ValidatorResult(false); -// validatorResult.setErrorDetails("This experiment is wrong, no experimentID set"); -// validationResults.addToValidationResultList(validatorResult); -// validationResults.setValidationState(false); -// } -// return validationResults; -// } -//}
