Repository: airavata Updated Branches: refs/heads/master d89236975 -> 7812dcaf0
experiment execution for AIRAVATA-1652 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/7812dcaf Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/7812dcaf Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/7812dcaf Branch: refs/heads/master Commit: 7812dcaf004d510e532e448a6053e8976ddc7154 Parents: d892369 Author: Chathuri Wimalasena <[email protected]> Authored: Tue Mar 31 15:24:31 2015 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Tue Mar 31 15:24:31 2015 -0400 ---------------------------------------------------------------------- .../test-suite/multi-tenanted-airavata/pom.xml | 10 + .../ApplicationRegister.java | 2 - .../ExperimentExecution.java | 325 +++++++++++++++++++ .../testsuite/multitenantedairavata/Setup.java | 2 +- .../utils/TestFrameworkConstants.java | 10 + .../main/resources/airavata-client.properties | 7 + 6 files changed, 353 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/7812dcaf/modules/test-suite/multi-tenanted-airavata/pom.xml ---------------------------------------------------------------------- diff --git a/modules/test-suite/multi-tenanted-airavata/pom.xml b/modules/test-suite/multi-tenanted-airavata/pom.xml index 21e157c..f974c59 100644 --- a/modules/test-suite/multi-tenanted-airavata/pom.xml +++ b/modules/test-suite/multi-tenanted-airavata/pom.xml @@ -51,6 +51,16 @@ </dependency> <dependency> <groupId>org.apache.airavata</groupId> + <artifactId>airavata-model-utils</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-messaging-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> <artifactId>airavata-data-models</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/airavata/blob/7812dcaf/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ApplicationRegister.java ---------------------------------------------------------------------- diff --git a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ApplicationRegister.java b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ApplicationRegister.java index b11e572..8e71958 100644 --- a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ApplicationRegister.java +++ b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ApplicationRegister.java @@ -77,8 +77,6 @@ public class ApplicationRegister { } } - - public void addApplications () throws Exception{ addAmberApplication(); addEchoApplication(); http://git-wip-us.apache.org/repos/asf/airavata/blob/7812dcaf/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java ---------------------------------------------------------------------- diff --git a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java new file mode 100644 index 0000000..b6b5476 --- /dev/null +++ b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java @@ -0,0 +1,325 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ + +package org.apache.airavata.testsuite.multitenantedairavata; + +import org.apache.airavata.api.Airavata; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; +import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; +import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.util.ExperimentModelUtil; +import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling; +import org.apache.airavata.model.workspace.experiment.Experiment; +import org.apache.airavata.model.workspace.experiment.ExperimentState; +import org.apache.airavata.model.workspace.experiment.UserConfigurationData; +import org.apache.airavata.testsuite.multitenantedairavata.utils.PropertyFileType; +import org.apache.airavata.testsuite.multitenantedairavata.utils.PropertyReader; +import org.apache.airavata.testsuite.multitenantedairavata.utils.TestFrameworkConstants; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ExperimentExecution { + private Airavata.Client airavata; + private final static Logger logger = LoggerFactory.getLogger(ExperimentExecution.class); + private Map<String, String> experimentsWithTokens; + private Map<String, String> experimentsWithGateway; + private Map<String, String> csTokens; + private Map<String, String> appInterfaceMap; + private Map<String, String> projectsMap; + private PropertyReader propertyReader; + + public ExperimentExecution(Airavata.Client airavata, + Map<String, String> tokenMap, + Map<String, String> appInterfaces, + Map<String, String> projectMap ) { + this.airavata = airavata; + this.csTokens = tokenMap; + this.appInterfaceMap = appInterfaces; + this.propertyReader = new PropertyReader(); + this.projectsMap = projectMap; + this.experimentsWithTokens = new HashMap<String, String>(); + this.experimentsWithGateway = new HashMap<String, String>(); + } + + public void launchExperiments () throws Exception { + try { + for (String expId : experimentsWithTokens.keySet()){ + airavata.launchExperiment(expId, experimentsWithTokens.get(expId)); + } + }catch (Exception e){ + logger.error("Error while launching experiment", e); + throw new Exception("Error while launching experiment", e); + } + } + + public void monitorExperiments () throws Exception { + String brokerUrl = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_BROKER_URL, PropertyFileType.AIRAVATA_CLIENT); + System.out.println("broker url " + brokerUrl); + final String exchangeName = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_EXCHANGE_NAME, PropertyFileType.AIRAVATA_CLIENT); + RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName); + for (final String expId : experimentsWithGateway.keySet()){ + final String gatewayId = experimentsWithGateway.get(expId); + consumer.listen(new MessageHandler() { + @Override + public Map<String, Object> getProperties() { + Map<String, Object> props = new HashMap<String, Object>(); + List<String> routingKeys = new ArrayList<String>(); + routingKeys.add(gatewayId); + routingKeys.add(gatewayId + "." + expId); + routingKeys.add(gatewayId + "." + expId + ".*"); + routingKeys.add(gatewayId + "." + expId + ".*.*"); + routingKeys.add(gatewayId + "." + expId + ".*.*.*"); + + props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); + return props; + } + + @Override + public void onMessage(MessageContext message) { + + if (message.getType().equals(MessageType.EXPERIMENT)){ + try { + ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + ExperimentState expState = event.getState(); + if (expState.equals(ExperimentState.COMPLETED)){ + // check file transfers + List<OutputDataObjectType> experimentOutputs = airavata.getExperimentOutputs(expId); + int i = 1; + for (OutputDataObjectType output : experimentOutputs){ + logger.info("################ Experiment : " + expId + " COMPLETES ###################"); + logger.info("Output " + i + " : " + output.getValue()); + i++; + } + } + logger.info(" Experiment Id : '" + expId + + "' with state : '" + event.getState().toString() + + " for Gateway " + event.getGatewayId()); + } catch (TException e) { + logger.error(e.getMessage(), e); + } + }else if (message.getType().equals(MessageType.JOB)){ + try { + JobStatusChangeEvent event = new JobStatusChangeEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + logger.info(" Job ID : '" + event.getJobIdentity().getJobId() + + "' with state : '" + event.getState().toString() + + " for Gateway " + event.getJobIdentity().getGatewayId()); + } catch (TException e) { + logger.error(e.getMessage(), e); + } + } + } + }); + } + + + } + + public void createAmberExperiment () throws Exception{ + try { + for (String gatewayId : csTokens.keySet()){ + String token = csTokens.get(gatewayId); + Map<String, String> appsWithNames = generateAppsPerGateway(gatewayId); + for (String appId : appsWithNames.keySet()){ + List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(appId); + List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(appId); + String appName = appsWithNames.get(appId); + if (appName.equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)){ + String heatRSTFile = propertyReader.readProperty(TestFrameworkConstants.AppcatalogConstants.AMBER_HEAT_RST_LOCATION, PropertyFileType.TEST_FRAMEWORK); + String prodInFile = propertyReader.readProperty(TestFrameworkConstants.AppcatalogConstants.AMBER_PROD_IN_LOCATION, PropertyFileType.TEST_FRAMEWORK); + String prmTopFile = propertyReader.readProperty(TestFrameworkConstants.AppcatalogConstants.AMBER_PRMTOP_LOCATION, PropertyFileType.TEST_FRAMEWORK); + + for (InputDataObjectType inputDataObjectType : applicationInputs) { + if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) { + inputDataObjectType.setValue(heatRSTFile); + } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) { + inputDataObjectType.setValue(prodInFile); + } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) { + inputDataObjectType.setValue(prmTopFile); + } + } + + String projectId = getProjectIdForGateway(gatewayId); + Experiment simpleExperiment = + ExperimentModelUtil.createSimpleExperiment(projectId, "admin", "Amber Experiment", "Amber Experiment run", appId, applicationInputs); + simpleExperiment.setExperimentOutputs(appOutputs); + String experimentId; + Map<String, String> computeResources = airavata.getAvailableAppInterfaceComputeResources(appId); + if (computeResources != null && computeResources.size() != 0) { + for (String id : computeResources.keySet()) { + String resourceName = computeResources.get(id); + if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.TRESTLES_RESOURCE_NAME)) { + ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0, 1, null); + UserConfigurationData userConfigurationData = new UserConfigurationData(); + userConfigurationData.setAiravataAutoSchedule(false); + userConfigurationData.setOverrideManualScheduledParams(false); + userConfigurationData.setComputationalResourceScheduling(scheduling); + simpleExperiment.setUserConfigurationData(userConfigurationData); + experimentId = airavata.createExperiment(gatewayId, simpleExperiment); + experimentsWithTokens.put(experimentId, token); + experimentsWithGateway.put(experimentId, gatewayId); + }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) { + ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0, 1, null); + UserConfigurationData userConfigurationData = new UserConfigurationData(); + userConfigurationData.setAiravataAutoSchedule(false); + userConfigurationData.setOverrideManualScheduledParams(false); + userConfigurationData.setComputationalResourceScheduling(scheduling); + simpleExperiment.setUserConfigurationData(userConfigurationData); + experimentId = airavata.createExperiment(gatewayId, simpleExperiment); + experimentsWithTokens.put(experimentId, token); + experimentsWithGateway.put(experimentId, gatewayId); + }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.BR2_RESOURCE_NAME)) { + ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0, 1, null); + UserConfigurationData userConfigurationData = new UserConfigurationData(); + userConfigurationData.setAiravataAutoSchedule(false); + userConfigurationData.setOverrideManualScheduledParams(false); + userConfigurationData.setComputationalResourceScheduling(scheduling); + simpleExperiment.setUserConfigurationData(userConfigurationData); + experimentId = airavata.createExperiment(gatewayId, simpleExperiment); + experimentsWithTokens.put(experimentId, token); + experimentsWithGateway.put(experimentId, gatewayId); + } + } + } + } + } + } + }catch (Exception e){ + logger.error("Error while creating AMBEr experiment", e); + throw new Exception("Error while creating AMBER experiment", e); + } + } + + public String getProjectIdForGateway (String gatewayId){ + for (String projectId : projectsMap.keySet()){ + String gateway = projectsMap.get(projectId); + if (gateway.equals(gatewayId)){ + return projectId; + } + } + return null; + } + + public Map<String, String> generateAppsPerGateway (String gatewayId) throws Exception { + Map<String, String> appWithNames = new HashMap<String, String>(); + try { + for (String appId : appInterfaceMap.keySet()){ + String gateway = appInterfaceMap.get(appId); + ApplicationInterfaceDescription applicationInterface = airavata.getApplicationInterface(appId); + if (gateway.equals(gatewayId)){ + appWithNames.put(appId, applicationInterface.getApplicationName()); + } + } + }catch (Exception e){ + logger.error("Error while getting application interface", e); + throw new Exception("Error while getting application interface", e); + } + + return appWithNames; + } + + public void createEchoExperiment () throws Exception{ + try { + for (String gatewayId : csTokens.keySet()) { + String token = csTokens.get(gatewayId); + Map<String, String> appsWithNames = generateAppsPerGateway(gatewayId); + for (String appId : appsWithNames.keySet()) { + List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(appId); + List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(appId); + String appName = appsWithNames.get(appId); + if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ECHO_NAME)) { + for (InputDataObjectType inputDataObjectType : applicationInputs) { + if (inputDataObjectType.getName().equalsIgnoreCase("input_to_Echo")) { + inputDataObjectType.setValue("Hello World !!!"); + } + } + + String projectId = getProjectIdForGateway(gatewayId); + Experiment simpleExperiment = + ExperimentModelUtil.createSimpleExperiment(projectId, "admin", "Echo Experiment", "Echo Experiment run", appId, applicationInputs); + simpleExperiment.setExperimentOutputs(appOutputs); + String experimentId; + Map<String, String> computeResources = airavata.getAvailableAppInterfaceComputeResources(appId); + if (computeResources != null && computeResources.size() != 0) { + for (String id : computeResources.keySet()) { + String resourceName = computeResources.get(id); + if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.TRESTLES_RESOURCE_NAME)) { + ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0, 1, null); + UserConfigurationData userConfigurationData = new UserConfigurationData(); + userConfigurationData.setAiravataAutoSchedule(false); + userConfigurationData.setOverrideManualScheduledParams(false); + userConfigurationData.setComputationalResourceScheduling(scheduling); + simpleExperiment.setUserConfigurationData(userConfigurationData); + experimentId = airavata.createExperiment(gatewayId, simpleExperiment); + experimentsWithTokens.put(experimentId, token); + experimentsWithGateway.put(experimentId, gatewayId); + } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) { + ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0, 1, null); + UserConfigurationData userConfigurationData = new UserConfigurationData(); + userConfigurationData.setAiravataAutoSchedule(false); + userConfigurationData.setOverrideManualScheduledParams(false); + userConfigurationData.setComputationalResourceScheduling(scheduling); + simpleExperiment.setUserConfigurationData(userConfigurationData); + experimentId = airavata.createExperiment(gatewayId, simpleExperiment); + experimentsWithTokens.put(experimentId, token); + experimentsWithGateway.put(experimentId, gatewayId); + } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.BR2_RESOURCE_NAME)) { + ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0, 1, null); + UserConfigurationData userConfigurationData = new UserConfigurationData(); + userConfigurationData.setAiravataAutoSchedule(false); + userConfigurationData.setOverrideManualScheduledParams(false); + userConfigurationData.setComputationalResourceScheduling(scheduling); + simpleExperiment.setUserConfigurationData(userConfigurationData); + experimentId = airavata.createExperiment(gatewayId, simpleExperiment); + experimentsWithTokens.put(experimentId, token); + experimentsWithGateway.put(experimentId, gatewayId); + } + } + } + } + } + } + }catch (Exception e){ + logger.error("Error while creating Echo experiment", e); + throw new Exception("Error while creating Echo experiment", e); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7812dcaf/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/Setup.java ---------------------------------------------------------------------- diff --git a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/Setup.java b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/Setup.java index 4cfdab2..9b94d4e 100644 --- a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/Setup.java +++ b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/Setup.java @@ -107,7 +107,7 @@ public class Setup { project.setName("testProj_" + gatewayId); project.setOwner("testUser_" + gatewayId); String projectId = airavata.createProject(gatewayId, project); - projectMap.put(gatewayId, projectId); + projectMap.put(projectId, gatewayId); } public void registerSSHKeys () throws Exception{ http://git-wip-us.apache.org/repos/asf/airavata/blob/7812dcaf/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/TestFrameworkConstants.java ---------------------------------------------------------------------- diff --git a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/TestFrameworkConstants.java b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/TestFrameworkConstants.java index c4bf8e4..f64ce67 100644 --- a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/TestFrameworkConstants.java +++ b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/TestFrameworkConstants.java @@ -32,6 +32,8 @@ public class TestFrameworkConstants { public static final String CS_JBDC_DRIVER = "credential.store.jdbc.driver"; public static final String CS_DB_USERNAME = "credential.store.jdbc.user"; public static final String CS_DB_PWD = "credential.store.jdbc.password"; + public static final String RABBIT_BROKER_URL = "rabbitmq.broker.url"; + public static final String RABBIT_EXCHANGE_NAME = "rabbitmq.exchange.name"; } public static final class FrameworkPropertiesConstants { @@ -81,6 +83,14 @@ public class TestFrameworkConstants { public static final String STAMPEDE_RESOURCE_NAME = "stampede.tacc.xsede.org"; public static final String TRESTLES_RESOURCE_NAME = "trestles.sdsc.xsede.org"; public static final String BR2_RESOURCE_NAME = "bigred2.uits.iu.edu"; + + + public static final String AMBER_HEAT_RST_LOCATION = "02_Heat.rst_location"; + public static final String AMBER_PROD_IN_LOCATION = "03_Prod.in_location"; + public static final String AMBER_PRMTOP_LOCATION = "prmtop_location"; + + + } public static final class CredentialStoreConstants { http://git-wip-us.apache.org/repos/asf/airavata/blob/7812dcaf/modules/test-suite/multi-tenanted-airavata/src/main/resources/airavata-client.properties ---------------------------------------------------------------------- diff --git a/modules/test-suite/multi-tenanted-airavata/src/main/resources/airavata-client.properties b/modules/test-suite/multi-tenanted-airavata/src/main/resources/airavata-client.properties index 4513c21..c44bd1f 100644 --- a/modules/test-suite/multi-tenanted-airavata/src/main/resources/airavata-client.properties +++ b/modules/test-suite/multi-tenanted-airavata/src/main/resources/airavata-client.properties @@ -45,4 +45,11 @@ credential.store.jdbc.password=airavata credential.store.jdbc.driver=org.apache.derby.jdbc.ClientDriver +###################################################################### +# rabbitMQ related properties - monitoring is done with rabbitMQ +###################################################################### +rabbitmq.exchange.name=airavata_rabbitmq_exchange +rabbitmq.broker.url=amqp://localhost:5672 + +
