Repository: airavata Updated Branches: refs/heads/queue-gfac-rabbitmq 48be39fea -> cb6c4ccf2
adding more fixes to worker based submission Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/cb6c4ccf Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/cb6c4ccf Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/cb6c4ccf Branch: refs/heads/queue-gfac-rabbitmq Commit: cb6c4ccf267c165e64d6858584b2300eaaa37df4 Parents: 48be39f Author: Lahiru Gunathilake <[email protected]> Authored: Thu Mar 19 16:08:29 2015 -0400 Committer: Lahiru Gunathilake <[email protected]> Committed: Thu Mar 19 16:08:29 2015 -0400 ---------------------------------------------------------------------- .../airavata/api/server/AiravataAPIServer.java | 4 +- .../client/samples/CreateLaunchExperiment.java | 432 ++++++++++--------- .../airavata/common/utils/AiravataZKUtils.java | 5 + .../apache/airavata/common/utils/Constants.java | 1 + .../main/resources/airavata-server.properties | 10 +- .../main/resources/airavata-server.properties | 1 + .../airavata/gfac/server/GfacServerHandler.java | 14 +- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 5 +- .../core/monitor/GfacInternalStatusUpdator.java | 32 +- .../airavata/gfac/core/monitor/MonitorID.java | 3 +- .../airavata/gfac/core/utils/GFacUtils.java | 127 +++--- .../gfac/local/provider/impl/LocalProvider.java | 4 +- .../airavata/gfac/monitor/util/CommonUtils.java | 2 +- .../core/impl/RabbitMQTaskLaunchConsumer.java | 5 +- .../server/OrchestratorServerHandler.java | 4 +- .../util/OrchestratorRecoveryHandler.java | 2 +- .../core/impl/GFACPassiveJobSubmitter.java | 44 +- .../core/impl/GFACRPCJobSubmitter.java | 4 +- 18 files changed, 369 insertions(+), 330 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java index 2556df1..159e0e4 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java @@ -127,7 +127,7 @@ public class AiravataAPIServer implements IServer, Watcher{ String rabbitMqBrokerURL = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.RABBITMQ_BROKER_URL); String rabbitMqExchange = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.RABBITMQ_EXCHANGE); String rabbitMq = rabbitMqBrokerURL + File.separator + rabbitMqExchange; - zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is required, this will only use to store some data + zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); // no watcher is required, this will only use to store some data String apiServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_API_SERVER_NODE, "/airavata-server"); String OrchServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE, "/orchestrator-server"); String gfacServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); @@ -309,7 +309,7 @@ public class AiravataAPIServer implements IServer, Watcher{ case Expired:case Disconnected: try { mutex = -1; - zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this); + zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this); synchronized (mutex) { mutex.wait(); // waiting for the syncConnected event } http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index f499345..231da87 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -52,13 +52,13 @@ public class CreateLaunchExperiment { public static final int THRIFT_SERVER_PORT = 8930; // public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org"; // public static final int THRIFT_SERVER_PORT = 9930; - + private final static Logger logger = LoggerFactory.getLogger(CreateLaunchExperiment.class); private static final String DEFAULT_USER = "default.registry.user"; private static final String DEFAULT_GATEWAY = "php_reference_gateway"; private static Airavata.Client airavataClient; - private static String echoAppId = "Echo_1365a7fd-eae1-4575-b447-99afb4d79c82"; + private static String echoAppId = "Echo_78c785c6-3748-4a93-8569-19fee56581bc"; private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9"; private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762"; private static String amberAppId = "Amber_aa083c86-4680-4002-b3ef-fad93c181926"; @@ -78,27 +78,26 @@ public class CreateLaunchExperiment { private static String gatewayId; - // unicore service endpoint url + // unicore service endpoint url private static final String unicoreEndPointURL = "https://fsd-cloud15.zam.kfa-juelich.de:7000/INTEROP1/services/BESFactory?res=default_bes_factory"; - - + + public static void main(String[] args) throws Exception { airavataClient = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT); System.out.println("API version is " + airavataClient.getAPIVersion()); - getAvailableAppInterfaceComputeResources("Echo_4fb76cb3-6bf6-409e-aa17-7cc7ee6e41af"); // createGateway(); // getGateway("testGatewayId"); // registerApplications(); // run this only the first time -// createAndLaunchExp(); + createAndLaunchExp(); } - + private static String fsdResourceId; public static void getAvailableAppInterfaceComputeResources(String appInterfaceId) { try { Map<String, String> availableAppInterfaceComputeResources = airavataClient.getAvailableAppInterfaceComputeResources(appInterfaceId); - for (String key : availableAppInterfaceComputeResources.keySet()){ + for (String key : availableAppInterfaceComputeResources.keySet()) { System.out.println("id : " + key); System.out.println("name : " + availableAppInterfaceComputeResources.get(key)); } @@ -115,7 +114,7 @@ public class CreateLaunchExperiment { } - public static void createGateway(){ + public static void createGateway() { try { Gateway gateway = new Gateway(); gateway.setGatewayId("testGatewayId2"); @@ -134,14 +133,14 @@ public class CreateLaunchExperiment { } - public static void getGateway(String gatewayId){ + public static void getGateway(String gatewayId) { try { Gateway gateway = airavataClient.getGateway(gatewayId); gateway.setDomain("testDomain"); airavataClient.updateGateway(gatewayId, gateway); List<Gateway> allGateways = airavataClient.getAllGateways(); System.out.println(allGateways.size()); - if (airavataClient.isGatewayExist(gatewayId)){ + if (airavataClient.isGatewayExist(gatewayId)) { Gateway gateway1 = airavataClient.getGateway(gatewayId); System.out.println(gateway1.getGatewayName()); } @@ -161,10 +160,9 @@ public class CreateLaunchExperiment { public static void createAndLaunchExp() throws TException { -// final String expId = createEchoExperimentForFSD(airavataClient); List<String> experimentIds = new ArrayList<String>(); try { - for (int i = 0; i < 1; i++) { + for (int i = 0; i < 100; i++) { // final String expId = createExperimentForSSHHost(airavata); // final String expId = createEchoExperimentForFSD(airavataClient); // final String expId = createMPIExperimentForFSD(airavataClient); @@ -192,11 +190,11 @@ public class CreateLaunchExperiment { launchExperiment(airavataClient, expId); } - Thread.sleep(100); - for (String exId : experimentIds) { - Experiment experiment = airavataClient.getExperiment(exId); - System.out.println(experiment.getExperimentStatus().toString()); - } + Thread.sleep(10000); + for (String exId : experimentIds) { + Experiment experiment = airavataClient.getExperiment(exId); + System.out.println(experiment.getExperimentID() + " " + experiment.getExperimentStatus().getExperimentState().name()); + } } catch (Exception e) { @@ -206,8 +204,7 @@ public class CreateLaunchExperiment { } } - - + public static void registerApplications() { RegisterSampleApplications registerSampleApplications = new RegisterSampleApplications(airavataClient); @@ -233,25 +230,25 @@ public class CreateLaunchExperiment { } public static String registerUnicoreEndpoint(String hostName, String hostDesc, JobSubmissionProtocol protocol, SecurityProtocol securityProtocol) throws TException { - - ComputeResourceDescription computeResourceDescription = RegisterSampleApplicationsUtils - .createComputeResourceDescription(hostName, hostDesc, null, null); - - fsdResourceId = airavataClient.registerComputeResource(computeResourceDescription); - - if (fsdResourceId.isEmpty()) - throw new AiravataClientException(); - - System.out.println("FSD Compute ResourceID: "+fsdResourceId); - - JobSubmissionInterface jobSubmission = RegisterSampleApplicationsUtils.createJobSubmissionInterface(fsdResourceId, protocol, 2); - UnicoreJobSubmission ucrJobSubmission = new UnicoreJobSubmission(); - ucrJobSubmission.setSecurityProtocol(securityProtocol); - ucrJobSubmission.setUnicoreEndPointURL(unicoreEndPointURL); - - return jobSubmission.getJobSubmissionInterfaceId(); - } - + + ComputeResourceDescription computeResourceDescription = RegisterSampleApplicationsUtils + .createComputeResourceDescription(hostName, hostDesc, null, null); + + fsdResourceId = airavataClient.registerComputeResource(computeResourceDescription); + + if (fsdResourceId.isEmpty()) + throw new AiravataClientException(); + + System.out.println("FSD Compute ResourceID: " + fsdResourceId); + + JobSubmissionInterface jobSubmission = RegisterSampleApplicationsUtils.createJobSubmissionInterface(fsdResourceId, protocol, 2); + UnicoreJobSubmission ucrJobSubmission = new UnicoreJobSubmission(); + ucrJobSubmission.setSecurityProtocol(securityProtocol); + ucrJobSubmission.setUnicoreEndPointURL(unicoreEndPointURL); + + return jobSubmission.getJobSubmissionInterfaceId(); + } + public static String createEchoExperimentForTrestles(Airavata.Client client) throws TException { try { List<InputDataObjectType> exInputs = client.getApplicationInputs(echoAppId); @@ -299,28 +296,27 @@ public class CreateLaunchExperiment { } return null; } - - + + public static String createEchoExperimentForFSD(Airavata.Client client) throws TException { try { - List<InputDataObjectType> exInputs = client.getApplicationInputs(echoAppId); + List<InputDataObjectType> exInputs = client.getApplicationInputs(echoAppId); for (InputDataObjectType inputDataObjectType : exInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo")) { - inputDataObjectType.setValue("Hello World"); - }else if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo2")) { + if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo")) { + inputDataObjectType.setValue("Hello World"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo2")) { inputDataObjectType.setValue("http://www.textfiles.com/100/ad.txt"); - }else if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo3")) { + } else if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo3")) { inputDataObjectType.setValue("file:///tmp/test.txt"); - } + } } List<OutputDataObjectType> exOut = client.getApplicationOutputs(echoAppId); - Experiment simpleExperiment = + Experiment simpleExperiment = ExperimentModelUtil.createSimpleExperiment("default", "admin", "echoExperiment", "SimpleEcho2", echoAppId, exInputs); simpleExperiment.setExperimentOutputs(exOut); - - - + + Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(echoAppId); if (computeResources != null && computeResources.size() != 0) { for (String id : computeResources.keySet()) { @@ -331,13 +327,13 @@ public class CreateLaunchExperiment { userConfigurationData.setAiravataAutoSchedule(false); userConfigurationData.setOverrideManualScheduledParams(false); userConfigurationData.setComputationalResourceScheduling(scheduling); - + // set output directory AdvancedOutputDataHandling dataHandling = new AdvancedOutputDataHandling(); - dataHandling.setOutputDataDir("/tmp/airavata/output/"+UUID.randomUUID().toString()+"/"); + dataHandling.setOutputDataDir("/tmp/airavata/output/" + UUID.randomUUID().toString() + "/"); userConfigurationData.setAdvanceOutputDataHandling(dataHandling); simpleExperiment.setUserConfigurationData(userConfigurationData); - + return client.createExperiment(DEFAULT_GATEWAY, simpleExperiment); } } @@ -357,25 +353,24 @@ public class CreateLaunchExperiment { } return null; } - - + + public static String createMPIExperimentForFSD(Airavata.Client client) throws TException { try { - List<InputDataObjectType> exInputs = client.getApplicationInputs(mpiAppId); + List<InputDataObjectType> exInputs = client.getApplicationInputs(mpiAppId); for (InputDataObjectType inputDataObjectType : exInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("Sample_Input")) { - inputDataObjectType.setValue(""); - } + if (inputDataObjectType.getName().equalsIgnoreCase("Sample_Input")) { + inputDataObjectType.setValue(""); + } } List<OutputDataObjectType> exOut = client.getApplicationOutputs(mpiAppId); - - Experiment simpleExperiment = + + Experiment simpleExperiment = ExperimentModelUtil.createSimpleExperiment("default", "admin", "mpiExperiment", "HelloMPI", mpiAppId, null); - simpleExperiment.setExperimentOutputs(exOut); - - - + simpleExperiment.setExperimentOutputs(exOut); + + Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(mpiAppId); if (computeResources != null && computeResources.size() != 0) { for (String id : computeResources.keySet()) { @@ -386,13 +381,13 @@ public class CreateLaunchExperiment { userConfigurationData.setAiravataAutoSchedule(false); userConfigurationData.setOverrideManualScheduledParams(false); userConfigurationData.setComputationalResourceScheduling(scheduling); - + // set output directory AdvancedOutputDataHandling dataHandling = new AdvancedOutputDataHandling(); - dataHandling.setOutputDataDir("/tmp/airavata/output/"+UUID.randomUUID().toString()+"/"); + dataHandling.setOutputDataDir("/tmp/airavata/output/" + UUID.randomUUID().toString() + "/"); userConfigurationData.setAdvanceOutputDataHandling(dataHandling); simpleExperiment.setUserConfigurationData(userConfigurationData); - + return client.createExperiment(DEFAULT_GATEWAY, simpleExperiment); } } @@ -413,15 +408,13 @@ public class CreateLaunchExperiment { return null; } - - - + public static String createExperimentWRFStampede(Airavata.Client client) throws TException { try { - List<InputDataObjectType> exInputs = client.getApplicationInputs(wrfAppId); + List<InputDataObjectType> exInputs = client.getApplicationInputs(wrfAppId); setWRFInputs(exInputs); List<OutputDataObjectType> exOut = client.getApplicationOutputs(wrfAppId); - + Experiment simpleExperiment = ExperimentModelUtil.createSimpleExperiment("default", "admin", "WRFExperiment", "Testing", wrfAppId, exInputs); simpleExperiment.setExperimentOutputs(exOut); @@ -458,26 +451,26 @@ public class CreateLaunchExperiment { } - private static void setWRFInputs(List<InputDataObjectType> exInputs) { - for (InputDataObjectType inputDataObjectType : exInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("Config_Namelist_File")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/WRF_FILES/namelist.input"); - }else if (inputDataObjectType.getName().equalsIgnoreCase("WRF_Initial_Conditions")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/WRF_FILES/wrfinput_d01"); - }else if (inputDataObjectType.getName().equalsIgnoreCase("WRF_Boundary_File")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/WRF_FILES/wrfbdy_d01"); - } - } - } + private static void setWRFInputs(List<InputDataObjectType> exInputs) { + for (InputDataObjectType inputDataObjectType : exInputs) { + if (inputDataObjectType.getName().equalsIgnoreCase("Config_Namelist_File")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/WRF_FILES/namelist.input"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("WRF_Initial_Conditions")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/WRF_FILES/wrfinput_d01"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("WRF_Boundary_File")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/WRF_FILES/wrfbdy_d01"); + } + } + } public static String createExperimentGROMACSStampede(Airavata.Client client) throws TException { try { - - List<InputDataObjectType> exInputs = client.getApplicationInputs(gromacsAppId); - setGROMACSInputs(exInputs); + + List<InputDataObjectType> exInputs = client.getApplicationInputs(gromacsAppId); + setGROMACSInputs(exInputs); List<OutputDataObjectType> exOut = client.getApplicationOutputs(gromacsAppId); - + Experiment simpleExperiment = ExperimentModelUtil.createSimpleExperiment("default", "admin", "GromacsExperiment", "Testing", gromacsAppId, exInputs); simpleExperiment.setExperimentOutputs(exOut); @@ -514,20 +507,21 @@ public class CreateLaunchExperiment { } private static void setGROMACSInputs(List<InputDataObjectType> exInputs) { - for (InputDataObjectType inputDataObjectType : exInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("GROMOS_Coordinate_File")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/GROMMACS_FILES/pdb1y6l-EM-vacuum.gro"); - }else if (inputDataObjectType.getName().equalsIgnoreCase("Portable_Input_Binary_File")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/GROMMACS_FILES/pdb1y6l-EM-vacuum.tpr"); - } - } - } + for (InputDataObjectType inputDataObjectType : exInputs) { + if (inputDataObjectType.getName().equalsIgnoreCase("GROMOS_Coordinate_File")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/GROMMACS_FILES/pdb1y6l-EM-vacuum.gro"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("Portable_Input_Binary_File")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/GROMMACS_FILES/pdb1y6l-EM-vacuum.tpr"); + } + } + } + public static String createExperimentESPRESSOStampede(Airavata.Client client) throws TException { try { - List<InputDataObjectType> exInputs = client.getApplicationInputs(espressoAppId); - setESPRESSOInputs(exInputs); + List<InputDataObjectType> exInputs = client.getApplicationInputs(espressoAppId); + setESPRESSOInputs(exInputs); List<OutputDataObjectType> exOut = client.getApplicationOutputs(espressoAppId); - + Experiment simpleExperiment = ExperimentModelUtil.createSimpleExperiment("default", "admin", "EspressoExperiment", "Testing", espressoAppId, exInputs); simpleExperiment.setExperimentOutputs(exOut); @@ -562,22 +556,23 @@ public class CreateLaunchExperiment { } return null; } + private static void setESPRESSOInputs(List<InputDataObjectType> exInputs) { - for (InputDataObjectType inputDataObjectType : exInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("AI_Pseudopotential_File")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/ESPRESSO_FILES/Al.sample.in"); - }else if (inputDataObjectType.getName().equalsIgnoreCase("AI_Primitive_Cell")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/ESPRESSO_FILES/Al.pz-vbc.UPF"); - } - } -} + for (InputDataObjectType inputDataObjectType : exInputs) { + if (inputDataObjectType.getName().equalsIgnoreCase("AI_Pseudopotential_File")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/ESPRESSO_FILES/Al.sample.in"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("AI_Primitive_Cell")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/ESPRESSO_FILES/Al.pz-vbc.UPF"); + } + } + } public static String createExperimentTRINITYStampede(Airavata.Client client) throws TException { try { - List<InputDataObjectType> exInputs = client.getApplicationInputs(trinityAppId); - setTRINITYInputs(exInputs); + List<InputDataObjectType> exInputs = client.getApplicationInputs(trinityAppId); + setTRINITYInputs(exInputs); List<OutputDataObjectType> exOut = client.getApplicationOutputs(trinityAppId); - + Experiment simpleExperiment = ExperimentModelUtil.createSimpleExperiment("default", "admin", "TrinityExperiment", "Testing", trinityAppId, exInputs); simpleExperiment.setExperimentOutputs(exOut); @@ -612,21 +607,23 @@ public class CreateLaunchExperiment { } return null; } + private static void setTRINITYInputs(List<InputDataObjectType> exInputs) { - for (InputDataObjectType inputDataObjectType : exInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("RNA_Seq_Left_Input")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/TRINITY_FILES/reads.left.fq"); - }else if (inputDataObjectType.getName().equalsIgnoreCase("RNA_Seq_Right_Input")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/TRINITY_FILES/reads.right.fq"); - } - } + for (InputDataObjectType inputDataObjectType : exInputs) { + if (inputDataObjectType.getName().equalsIgnoreCase("RNA_Seq_Left_Input")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/TRINITY_FILES/reads.left.fq"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("RNA_Seq_Right_Input")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/TRINITY_FILES/reads.right.fq"); + } + } } + public static String createExperimentLAMMPSStampede(Airavata.Client client) throws TException { try { - List<InputDataObjectType> exInputs = client.getApplicationInputs(lammpsAppId); - setLAMMPSInputs(exInputs); + List<InputDataObjectType> exInputs = client.getApplicationInputs(lammpsAppId); + setLAMMPSInputs(exInputs); List<OutputDataObjectType> exOut = client.getApplicationOutputs(lammpsAppId); - + Experiment simpleExperiment = ExperimentModelUtil.createSimpleExperiment("default", "admin", "LAMMPSExperiment", "Testing", lammpsAppId, exInputs); simpleExperiment.setExperimentOutputs(exOut); @@ -661,17 +658,19 @@ public class CreateLaunchExperiment { } return null; } + private static void setLAMMPSInputs(List<InputDataObjectType> exInputs) { - for (InputDataObjectType inputDataObjectType : exInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("Friction_Simulation_Input")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/LAMMPS_FILES/in.friction"); - } - } + for (InputDataObjectType inputDataObjectType : exInputs) { + if (inputDataObjectType.getName().equalsIgnoreCase("Friction_Simulation_Input")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/LAMMPS_FILES/in.friction"); + } + } } + public static String createExperimentNWCHEMStampede(Airavata.Client client) throws TException { try { - List<InputDataObjectType> exInputs = client.getApplicationInputs(nwchemAppId); - setNWCHEMInputs(exInputs); + List<InputDataObjectType> exInputs = client.getApplicationInputs(nwchemAppId); + setNWCHEMInputs(exInputs); List<OutputDataObjectType> exOut = client.getApplicationOutputs(nwchemAppId); Experiment simpleExperiment = @@ -708,17 +707,19 @@ public class CreateLaunchExperiment { } return null; } + private static void setNWCHEMInputs(List<InputDataObjectType> exInputs) { - for (InputDataObjectType inputDataObjectType : exInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("Water_Molecule_Input")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/NWCHEM_FILES/water.nw"); - } - } + for (InputDataObjectType inputDataObjectType : exInputs) { + if (inputDataObjectType.getName().equalsIgnoreCase("Water_Molecule_Input")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/NWCHEM_FILES/water.nw"); + } + } } + public static String createExperimentAUTODOCKStampede(Airavata.Client client) throws TException { try { - List<InputDataObjectType> exInputs = client.getApplicationInputs(nwchemAppId); - setAUTODOCKInputs(exInputs); + List<InputDataObjectType> exInputs = client.getApplicationInputs(nwchemAppId); + setAUTODOCKInputs(exInputs); List<OutputDataObjectType> exOut = client.getApplicationOutputs(nwchemAppId); Experiment simpleExperiment = @@ -755,43 +756,45 @@ public class CreateLaunchExperiment { } return null; } + private static void setAUTODOCKInputs(List<InputDataObjectType> exInputs) { - for (InputDataObjectType inputDataObjectType : exInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("AD4_parameters.dat")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/AD4_parameters.dat"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.A.map")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.A.map"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.C.map")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.C.map"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.d.map")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.d.map"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.e.map")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.e.map"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.HD.map")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.HD.map"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.maps.fld")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.maps.fld"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.NA.map")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.NA.map"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.N.map")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.N.map"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.OA.map")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.OA.map"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("ind.dpf")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/ind.dpf"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("ind.pdbqt")) { - inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/ind.pdbqt"); - } - } + for (InputDataObjectType inputDataObjectType : exInputs) { + if (inputDataObjectType.getName().equalsIgnoreCase("AD4_parameters.dat")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/AD4_parameters.dat"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.A.map")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.A.map"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.C.map")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.C.map"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.d.map")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.d.map"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.e.map")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.e.map"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.HD.map")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.HD.map"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.maps.fld")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.maps.fld"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.NA.map")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.NA.map"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.N.map")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.N.map"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.OA.map")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.OA.map"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("ind.dpf")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/ind.dpf"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("ind.pdbqt")) { + inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/ind.pdbqt"); + } + } } + public static String createExperimentWRFTrestles(Airavata.Client client) throws TException { try { - List<InputDataObjectType> exInputs = client.getApplicationInputs(wrfAppId); + List<InputDataObjectType> exInputs = client.getApplicationInputs(wrfAppId); setWRFInputs(exInputs); List<OutputDataObjectType> exOut = client.getApplicationOutputs(wrfAppId); - - Experiment simpleExperiment = + + Experiment simpleExperiment = ExperimentModelUtil.createSimpleExperiment("default", "admin", "WRFExperiment", "Testing", wrfAppId, exInputs); simpleExperiment.setExperimentOutputs(exOut); @@ -952,11 +955,11 @@ public class CreateLaunchExperiment { try { List<InputDataObjectType> exInputs = client.getApplicationInputs(echoAppId); for (InputDataObjectType inputDataObjectType : exInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo")) { - inputDataObjectType.setValue("Hello World"); - } - } - List<OutputDataObjectType> exOut = client.getApplicationOutputs(echoAppId); + if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo")) { + inputDataObjectType.setValue("Hello World"); + } + } + List<OutputDataObjectType> exOut = client.getApplicationOutputs(echoAppId); Project project = ProjectModelUtil.createProject("default", "admin", "test project"); String projectId = client.createProject(DEFAULT_GATEWAY, project); @@ -1068,13 +1071,13 @@ public class CreateLaunchExperiment { public static String createExperimentForBR2(Airavata.Client client) throws TException { try { - List<InputDataObjectType> exInputs = client.getApplicationInputs(echoAppId); - for (InputDataObjectType inputDataObjectType : exInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo")) { - inputDataObjectType.setValue("Hello World"); - } - } - List<OutputDataObjectType> exOut = client.getApplicationOutputs(echoAppId); + List<InputDataObjectType> exInputs = client.getApplicationInputs(echoAppId); + for (InputDataObjectType inputDataObjectType : exInputs) { + if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo")) { + inputDataObjectType.setValue("Hello World"); + } + } + List<OutputDataObjectType> exOut = client.getApplicationOutputs(echoAppId); Project project = ProjectModelUtil.createProject("default", "lahiru", "test project"); @@ -1163,6 +1166,7 @@ public class CreateLaunchExperiment { } return null; } + public static String createExperimentLAMMPSForLSF(Airavata.Client client) throws TException { try { List<InputDataObjectType> exInputs = client.getApplicationInputs(lammpsAppId); @@ -1225,7 +1229,7 @@ public class CreateLaunchExperiment { public static String createExperimentForBR2Amber(Airavata.Client client) throws TException { try { - List<InputDataObjectType> exInputs = client.getApplicationInputs(amberAppId); + List<InputDataObjectType> exInputs = client.getApplicationInputs(amberAppId); // for (InputDataObjectType inputDataObjectType : exInputs) { // if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) { // inputDataObjectType.setValue("/Users/raminder/Documents/Sample/Amber/02_Heat.rst"); @@ -1236,17 +1240,17 @@ public class CreateLaunchExperiment { // } // // } - for (InputDataObjectType inputDataObjectType : exInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) { - inputDataObjectType.setValue("file://[email protected]:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/02_Heat.rst"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) { - inputDataObjectType.setValue("file://[email protected]:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/03_Prod.in"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) { - inputDataObjectType.setValue("file://[email protected]:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/prmtop"); - } - } - - List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId); + for (InputDataObjectType inputDataObjectType : exInputs) { + if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) { + inputDataObjectType.setValue("file://[email protected]:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/02_Heat.rst"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) { + inputDataObjectType.setValue("file://[email protected]:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/03_Prod.in"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) { + inputDataObjectType.setValue("file://[email protected]:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/prmtop"); + } + } + + List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId); Project project = ProjectModelUtil.createProject("default", "admin", "test project"); String projectId = client.createProject(DEFAULT_GATEWAY, project); @@ -1289,7 +1293,7 @@ public class CreateLaunchExperiment { public static String createExperimentForStampedeAmber(Airavata.Client client) throws TException { try { - List<InputDataObjectType> exInputs = client.getApplicationInputs(amberAppId); + List<InputDataObjectType> exInputs = client.getApplicationInputs(amberAppId); // for (InputDataObjectType inputDataObjectType : exInputs) { // if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) { // inputDataObjectType.setValue("/Users/raminder/Documents/Sample/Amber/02_Heat.rst"); @@ -1300,17 +1304,17 @@ public class CreateLaunchExperiment { // } // // } - for (InputDataObjectType inputDataObjectType : exInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) { - inputDataObjectType.setValue("file://[email protected]:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/02_Heat.rst"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) { - inputDataObjectType.setValue("file://[email protected]:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/03_Prod.in"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) { - inputDataObjectType.setValue("file://[email protected]:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/prmtop"); - } - } + for (InputDataObjectType inputDataObjectType : exInputs) { + if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) { + inputDataObjectType.setValue("file://[email protected]:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/02_Heat.rst"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) { + inputDataObjectType.setValue("file://[email protected]:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/03_Prod.in"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) { + inputDataObjectType.setValue("file://[email protected]:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/prmtop"); + } + } - List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId); + List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId); Project project = ProjectModelUtil.createProject("default", "admin", "test project"); @@ -1353,8 +1357,8 @@ public class CreateLaunchExperiment { public static String createExperimentForTrestlesAmber(Airavata.Client client) throws TException { try { - - List<InputDataObjectType> exInputs = client.getApplicationInputs(amberAppId); + + List<InputDataObjectType> exInputs = client.getApplicationInputs(amberAppId); // for (InputDataObjectType inputDataObjectType : exInputs) { // if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) { // inputDataObjectType.setValue("/Users/raminder/Documents/Sample/Amber/02_Heat.rst"); @@ -1365,16 +1369,16 @@ public class CreateLaunchExperiment { // } // // } - for (InputDataObjectType inputDataObjectType : exInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) { - inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/02_Heat.rst"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) { - inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/03_Prod.in"); - } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) { - inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/prmtop"); - } - } - List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId); + for (InputDataObjectType inputDataObjectType : exInputs) { + if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) { + inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/02_Heat.rst"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) { + inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/03_Prod.in"); + } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) { + inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/prmtop"); + } + } + List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId); Project project = ProjectModelUtil.createProject("default", "admin", "test project"); String projectId = client.createProject(DEFAULT_GATEWAY, project); @@ -1418,7 +1422,7 @@ public class CreateLaunchExperiment { public static void launchExperiment(Airavata.Client client, String expId) throws TException { try { - String tokenId ="-0bbb-403b-a88a-42b6dbe198e9"; + String tokenId = "-0bbb-403b-a88a-42b6dbe198e9"; client.launchExperiment(expId, tokenId); } catch (ExperimentNotFoundException e) { logger.error("Error occured while launching the experiment...", e.getMessage()); http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java index 46f06f1..a0cc142 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java @@ -64,6 +64,10 @@ public class AiravataZKUtils { + ":" + ServerSettings.getSetting(Constants.ZOOKEEPER_SERVER_PORT,"2181"); } + public static int getZKTimeout()throws ApplicationSettingsException { + return Integer.parseInt(ServerSettings.getSetting(Constants.ZOOKEEPER_TIMEOUT,"30000")); + } + public static String getExpStatePath(String experimentId, String taskId) throws ApplicationSettingsException { return AiravataZKUtils.getExpZnodePath(experimentId, taskId) + File.separator + @@ -88,6 +92,7 @@ public class AiravataZKUtils { return null; } + public static int getExpStateValueWithGivenPath(ZooKeeper zk,String fullPath)throws ApplicationSettingsException, KeeperException, InterruptedException { Stat exists = zk.exists(fullPath, false); http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java index 154bea1..391a3c6 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java @@ -57,4 +57,5 @@ public final class Constants { public static final String ZOOKEEPER_API_SERVER_NAME = "api-server-name"; public static final String STAT = "stat"; public static final String JOB = "job"; + public static final String ZOOKEEPER_TIMEOUT = "zookeeper.timeout"; } http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties index c65feeb..2d8bc48 100644 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@ -214,20 +214,19 @@ amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876 connection.name=xsede #publisher +rabbitmq.broker.url=amqp://localhost:5672 + activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher publish.rabbitmq=false -<<<<<<< HEAD status.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchPublisher -rabbitmq.broker.url=amqp://localhost:5672 rabbitmq.status.exchange.name=airavata_rabbitmq_exchange rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange -======= + + activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher -rabbitmq.broker.url=amqp://gw111.iu.xsede.org:5672 rabbitmq.exchange.name=airavata_rabbitmq_exchange ->>>>>>> master ########################################################################### # Orchestrator module Configuration @@ -252,6 +251,7 @@ embedded.zk=true zookeeper.server.host=localhost zookeeper.server.port=2181 airavata-server=/api-server +zookeeper.timeout=30000 orchestrator-server=/orchestrator-server gfac-server=/gfac-server gfac-experiments=/gfac-experiments http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/credential-store/credential-store-webapp/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/credential-store/credential-store-webapp/src/main/resources/airavata-server.properties b/modules/credential-store/credential-store-webapp/src/main/resources/airavata-server.properties index 64e0160..f2e8d82 100644 --- a/modules/credential-store/credential-store-webapp/src/main/resources/airavata-server.properties +++ b/modules/credential-store/credential-store-webapp/src/main/resources/airavata-server.properties @@ -228,6 +228,7 @@ orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer embedded.zk=true zookeeper.server.host=localhost zookeeper.server.port=2181 +zookeeper.timeout=30000 airavata-server=/api-server orchestrator-server=/orchestrator-server gfac-server=/gfac-server http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 88979a4..d45710e 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -66,6 +66,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer; + private static int requestCount=0; + private Registry registry; private AppCatalog appCatalog; @@ -96,7 +98,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { String zkhostPort = AiravataZKUtils.getZKhostPort(); airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST) + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT); - zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is required, this will only use to store some data + zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); // no watcher is required, this will only use to store some data gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); synchronized (mutex) { @@ -110,14 +112,14 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { appCatalog = AppCatalogFactory.getAppCatalog(); setGatewayProperties(); BetterGfacImpl.startDaemonHandlers(); - BetterGfacImpl.startStatusUpdators(registry, zk, publisher); - inHandlerFutures = new ArrayList<Future>(); if (ServerSettings.isGFacPassiveMode()) { rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer(); rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler()); - } + BetterGfacImpl.startStatusUpdators(registry, zk, publisher, rabbitMQTaskLaunchConsumer); + inHandlerFutures = new ArrayList<Future>(); + } catch (ApplicationSettingsException e) { logger.error("Error initialising GFAC", e); throw new Exception("Error initialising GFAC", e); @@ -182,7 +184,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { break; case Expired:case Disconnected: try { - zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this); + zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this); synchronized (mutex) { mutex.wait(); // waiting for the syncConnected event } @@ -231,6 +233,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { * @param gatewayId */ public boolean submitJob(String experimentId, String taskId, String gatewayId) throws TException { + requestCount++; + logger.info("-----------------------------------------------------" + requestCount+"-----------------------------------------------------"); logger.infoId(experimentId, "GFac Received submit jog request for the Experiment: {} TaskId: {}", experimentId, taskId); GFac gfac = getGfac(); InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId, taskId, gatewayId); http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 7559878..3cd07c6 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -49,6 +49,7 @@ import org.apache.airavata.gfac.core.states.GfacPluginState; import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.messaging.core.PublisherFactory; +import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.appinterface.DataType; @@ -111,7 +112,7 @@ public class BetterGfacImpl implements GFac,Watcher { this.appCatalog = appCatalog; } - public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) { + public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher,RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) { try { String[] listenerClassList = ServerSettings.getActivityListeners(); Publisher rabbitMQPublisher = null; @@ -122,7 +123,7 @@ public class BetterGfacImpl implements GFac,Watcher { Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class); AbstractActivityListener abstractActivityListener = aClass.newInstance(); activityListeners.add(abstractActivityListener); - abstractActivityListener.setup(publisher, registry, zk, rabbitMQPublisher); + abstractActivityListener.setup(publisher, registry, zk, rabbitMQPublisher,rabbitMQTaskLaunchConsumer); log.info("Registering listener: " + listenerClass); publisher.registerListener(abstractActivityListener); } http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java index 26902e7..eaa3c5f 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java @@ -30,6 +30,9 @@ import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest; import org.apache.airavata.gfac.core.utils.GFacUtils; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.messaging.core.impl.RabbitMQProducer; +import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -44,19 +47,21 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc private static Integer mutex = -1; + private RabbitMQTaskLaunchConsumer consumer; + @Subscribe public void updateZK(GfacExperimentStateChangeRequest statusChangeRequest) throws Exception { logger.info("Gfac internal state changed to: " + statusChangeRequest.getState().toString()); MonitorID monitorID = statusChangeRequest.getMonitorID(); - String experimentPath = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments") + - File.separator + ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME) + File.separator + statusChangeRequest.getMonitorID().getExperimentID() + "+" + monitorID.getTaskID(); - String deliveryTagPath = experimentPath + GFacUtils.DELIVERY_TAG_POSTFIX; + String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + String experimentPath = experimentNode + File.separator + ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME) + + File.separator + statusChangeRequest.getMonitorID().getExperimentID() + "+" + monitorID.getTaskID(); Stat exists = null; try { if (!zk.getState().isConnected()) { String zkhostPort = AiravataZKUtils.getZKhostPort(); - zk = new ZooKeeper(zkhostPort, 6000, this); - synchronized (mutex){ + zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); + synchronized (mutex) { mutex.wait(); } } @@ -77,11 +82,11 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc throw new Exception(e.getMessage(), e); } Stat state = zk.exists(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false); - if(state == null) { + if (state == null) { // state znode has to be created zk.create(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - }else { + } else { zk.setData(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), state.getVersion()); } @@ -89,12 +94,20 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc case COMPLETED: logger.info("Experiment Completed, So removing the ZK entry for the experiment" + monitorID.getExperimentID()); logger.info("Zookeeper experiment Path: " + experimentPath); + if (ServerSettings.isGFacPassiveMode()) { + consumer.sendAck(GFacUtils.getDeliveryTag(statusChangeRequest.getMonitorID().getExperimentID(), + monitorID.getTaskID(), zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME))); + } ZKUtil.deleteRecursive(zk, experimentPath); break; case FAILED: logger.info("Experiment Failed, So removing the ZK entry for the experiment" + monitorID.getExperimentID()); logger.info("Zookeeper experiment Path: " + experimentPath); - ZKUtil.deleteRecursive(zk,experimentPath); + if (ServerSettings.isGFacPassiveMode()) { + consumer.sendAck(GFacUtils.getDeliveryTag(statusChangeRequest.getMonitorID().getExperimentID(), + monitorID.getTaskID(), zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME))); + } + ZKUtil.deleteRecursive(zk, experimentPath); break; default: } @@ -105,6 +118,9 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc if (configuration instanceof ZooKeeper) { this.zk = (ZooKeeper) configuration; } + if (configuration instanceof RabbitMQTaskLaunchConsumer) { + this.consumer = (RabbitMQTaskLaunchConsumer) configuration; + } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java index 55da288..aefe490 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java @@ -94,10 +94,11 @@ public class MonitorID { experimentID = jobExecutionContext.getExperiment().getExperimentID(); workflowNodeID = jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId();// at this point we only have one node todo: fix this try { - jobID = jobExecutionContext.getJobDetails().getJobID(); jobName = jobExecutionContext.getJobDetails().getJobName(); + jobID = jobExecutionContext.getJobDetails().getJobID(); }catch(NullPointerException e){ logger.error("There is not job created at this point"); + // this is not a big deal we create MonitorId before having a jobId or job Name } } http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java index 83928c3..707cf97 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java @@ -1159,29 +1159,13 @@ public class GFacUtils { public static boolean createExperimentEntryForPassive(String experimentID, String taskID, ZooKeeper zk, String experimentNode, String pickedChild, String tokenId,long deliveryTag) throws KeeperException, - InterruptedException { + InterruptedException, ApplicationSettingsException { String experimentPath = experimentNode + File.separator + pickedChild; String newExpNode = experimentPath + File.separator + experimentID + "+" + taskID; Stat exists1 = zk.exists(newExpNode, false); String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk); - String foundExperimentPath = null; if (exists1 == null && experimentEntry == null) { // this means this is a very new experiment - List<String> runningGfacNodeNames = AiravataZKUtils.getAllGfacNodeNames(zk); // here we take old gfac servers - - for (String gfacServerNode : runningGfacNodeNames) { - if (!gfacServerNode.equals(pickedChild)) { - foundExperimentPath = experimentNode + File.separator - + gfacServerNode + File.separator + experimentID - + "+" + taskID; - exists1 = zk.exists(foundExperimentPath, true); - if (exists1 != null) { // when the experiment is found we - // break the loop - break; - } - } - } - if (exists1 == null) { // OK this is a pretty new experiment so we // are going to create a new node log.info("This is a new Job, so creating all the experiment docs from the scratch"); Stat expParent = zk.exists(newExpNode, false); @@ -1199,34 +1183,35 @@ public class GFacUtils { String s1 = zk.create(newExpNode + File.separator + "operation", "submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.exists(s1, true);// we want to know when this node get deleted - String s2 = zk.create(newExpNode + DELIVERY_TAG_POSTFIX, ByteBuffer.allocate(8).putLong(deliveryTag).array(), ZooDefs.Ids.OPEN_ACL_UNSAFE, // here we store the value of delivery message + String s2 = zk.create(newExpNode + DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE, // here we store the value of delivery message CreateMode.PERSISTENT); - } else { - // ohhh this node exists in some other failed gfac folder, we - // have to move it to this gfac experiment list,safely + }else if(experimentEntry != null && GFacUtils.isCancelled(experimentID,taskID,zk) ){ + // this happens when a cancel request comes to a differnt gfac node, in this case we do not move gfac experiment + // node to gfac node specific location, because original request execution will fail with errors + log.error("This experiment is already cancelled and its already executing the cancel operation so cannot submit again !"); + return false; + } else if(experimentEntry != null && !GFacUtils.isCancelled(experimentID,taskID,zk)){ + if(ServerSettings.isGFacPassiveMode()){ + log.error("ExperimentID: " + experimentID + " taskID: " + taskID + + " was running by some Gfac instance,but it failed"); log.info("This is an old Job, so copying data from old experiment location"); zk.create(newExpNode, - zk.getData(foundExperimentPath, false, exists1), + zk.getData(experimentEntry, false, exists1), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - List<String> children = zk.getChildren(foundExperimentPath, + List<String> children = zk.getChildren(experimentEntry, false); for (String childNode1 : children) { - String level1 = foundExperimentPath + File.separator + String level1 = experimentEntry + File.separator + childNode1; - Stat exists2 = zk.exists(level1, false); // no need to check - // exists + Stat exists2 = zk.exists(level1, false); // no need to check exists String newLeve1 = newExpNode + File.separator + childNode1; - log.info("Creating new znode: " + newLeve1); // these has to - // be info - // logs + log.info("Creating new znode: " + newLeve1); // these has to be info logs zk.create(newLeve1, zk.getData(level1, false, exists2), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); for (String childNode2 : zk.getChildren(level1, false)) { String level2 = level1 + File.separator + childNode2; - Stat exists3 = zk.exists(level2, false); // no need to - // check - // exists + Stat exists3 = zk.exists(level2, false); // no need to check exists String newLeve2 = newLeve1 + File.separator + childNode2; log.info("Creating new znode: " + newLeve2); @@ -1239,40 +1224,47 @@ public class GFacUtils { // old experiment,otherwise we do // not delete a single file log.info("After a successful copying of experiment data for an old experiment we delete the old data"); - log.info("Deleting experiment data: " + foundExperimentPath); - ZKUtil.deleteRecursive(zk, foundExperimentPath); - } - }else if(experimentEntry != null && GFacUtils.isCancelled(experimentID,taskID,zk) ){ - // this happens when a cancel request comes to a differnt gfac node, in this case we do not move gfac experiment - // node to gfac node specific location, because original request execution will fail with errors - log.error("This experiment is already cancelled and its already executing the cancel operation so cannot submit again !"); - return false; - } else { - log.error("ExperimentID: " + experimentID + " taskID: " + taskID - + " is already running by this Gfac instance"); - List<String> runningGfacNodeNames = AiravataZKUtils - .getAllGfacNodeNames(zk); // here we take old gfac servers - // too - for (String gfacServerNode : runningGfacNodeNames) { - if (!gfacServerNode.equals(pickedChild)) { - foundExperimentPath = experimentNode + File.separator - + gfacServerNode + File.separator + experimentID - + "+" + taskID; - break; + log.info("Deleting experiment data: " + experimentEntry); + ZKUtil.deleteRecursive(zk, experimentEntry); + }else { + log.error("ExperimentID: " + experimentID + " taskID: " + taskID + + " is already running by this Gfac instance"); + List<String> runningGfacNodeNames = AiravataZKUtils + .getAllGfacNodeNames(zk); // here we take old gfac servers + // too + for (String gfacServerNode : runningGfacNodeNames) { + if (!gfacServerNode.equals(pickedChild)) { + experimentEntry = experimentNode + File.separator + + gfacServerNode + File.separator + experimentID + + "+" + taskID; + break; + } + } + if(experimentEntry!=null) { + ZKUtil.deleteRecursive(zk, experimentEntry); } } - ZKUtil.deleteRecursive(zk, foundExperimentPath); + } return true; } + /** + * This will return a value if the server is down because we iterate through exisiting experiment nodes, not + * through gfac-server nodes + * @param experimentID + * @param taskID + * @param zk + * @return + * @throws KeeperException + * @throws InterruptedException + */ public static String findExperimentEntry(String experimentID, String taskID, ZooKeeper zk ) throws KeeperException, InterruptedException { - String gfacServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); - List<String> children = zk.getChildren(gfacServer, false); + List<String> children = zk.getChildren(experimentNode, false); for(String pickedChild:children) { String experimentPath = experimentNode + File.separator + pickedChild; String newExpNode = experimentPath + File.separator + experimentID @@ -1341,6 +1333,18 @@ public class GFacUtils { } } + public static long getDeliveryTag(String experimentID, + String taskID, ZooKeeper zk, String experimentNode, + String pickedChild) throws KeeperException, InterruptedException,GFacException { + String experimentPath = experimentNode + File.separator + pickedChild; + String deliveryTagPath = experimentPath + File.separator + experimentID + + "+" + taskID + DELIVERY_TAG_POSTFIX; + Stat exists = zk.exists(deliveryTagPath, false); + if(exists==null) { + throw new GFacException("Cannot find delivery Tag for this experiment"); + } + return bytesToLong(zk.getData(deliveryTagPath, false, exists)); + } public static String getPluginData(JobExecutionContext jobExecutionContext, String className) throws ApplicationSettingsException, KeeperException, InterruptedException { @@ -1452,4 +1456,17 @@ public class GFacUtils { return sb.toString(); } + + public static byte[] longToBytes(long x) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(x); + return buffer.array(); + } + + public static long bytesToLong(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.put(bytes); + buffer.flip();//need flip + return buffer.getLong(); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java index 9f055e9..d62d3d7 100644 --- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java +++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java @@ -124,9 +124,9 @@ public class LocalProvider extends AbstractProvider { // log info log.info("Command = " + InputUtils.buildCommand(cmdList)); log.info("Working dir = " + builder.directory()); - for (String key : builder.environment().keySet()) { + /*for (String key : builder.environment().keySet()) { log.info("Env[" + key + "] = " + builder.environment().get(key)); - } + }*/ } public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException { http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java index cbac726..15b7241 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java @@ -199,7 +199,7 @@ public class CommonUtils { if (zk == null || !zk.getState().isConnected()) { try { final CountDownLatch countDownLatch = new CountDownLatch(1); - zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, new Watcher() { + zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), new Watcher() { @Override public void process(WatchedEvent event) { countDownLatch.countDown(); http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java index 7c88a25..9cad924 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java @@ -165,14 +165,15 @@ public class RabbitMQTaskLaunchConsumer { event = taskTerminateEvent; gatewayId = null; } + System.out.println("*deliveryTag:"+deliveryTag); MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId,deliveryTag); messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime())); handler.onMessage(messageContext); - try { + /*try { channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component to ack when the job is done } catch (IOException e) { logger.error(e.getMessage(), e); - } + }*/ } catch (TException e) { String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id; log.warn(msg, e); http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index f430bc9..63f0d9c 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -115,7 +115,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, // setGatewayName(ServerSettings.getDefaultUserGateway()); setAiravataUserName(ServerSettings.getDefaultUser()); try { - zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is + zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); // no watcher is // required, this // will only use to // store some data @@ -303,7 +303,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, case Expired: case Disconnected: try { - zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this); + zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this); synchronized (mutex) { mutex.wait(); // waiting for the syncConnected event } http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java index f19b949..993a303 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java @@ -70,7 +70,7 @@ public class OrchestratorRecoveryHandler implements Watcher { */ public void recover() throws OrchestratorException, ApplicationSettingsException, IOException, KeeperException, InterruptedException { String zkhostPort = AiravataZKUtils.getZKhostPort(); - zk = new ZooKeeper(zkhostPort, 6000, this); + zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); synchronized (mutex) { mutex.wait(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java index 8066113..c17638b 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java @@ -105,42 +105,30 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { try { if (zk == null || !zk.getState().isConnected()) { String zkhostPort = AiravataZKUtils.getZKhostPort(); - zk = new ZooKeeper(zkhostPort, 6000, this); + zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); synchronized (mutex) { mutex.wait(); } } - String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); - List<String> children = zk.getChildren(gfacServer, this); - - if (children.size() == 0) { - // Zookeeper data need cleaning - throw new OrchestratorException("There is no active GFac instance to route the request"); - } else { - String gatewayId = null; - CredentialReader credentialReader = GFacUtils.getCredentialReader(); - if (credentialReader != null) { - try { - gatewayId = credentialReader.getGatewayID(tokenId); - } catch (Exception e) { - logger.error(e.getLocalizedMessage()); - } + String gatewayId = null; + CredentialReader credentialReader = GFacUtils.getCredentialReader(); + if (credentialReader != null) { + try { + gatewayId = credentialReader.getGatewayID(tokenId); + } catch (Exception e) { + logger.error(e.getLocalizedMessage()); } - if(gatewayId == null || gatewayId.isEmpty()){ - gatewayId = ServerSettings.getDefaultUserGateway(); - } - - TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId,tokenId); - MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId); - messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); - publisher.publish(messageContext); } + if (gatewayId == null || gatewayId.isEmpty()) { + gatewayId = ServerSettings.getDefaultUserGateway(); + } + TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId, tokenId); + MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId); + messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + publisher.publish(messageContext); } catch (InterruptedException e) { logger.error(e.getMessage(), e); throw new OrchestratorException(e); - } catch (KeeperException e) { - logger.error(e.getMessage(), e); - throw new OrchestratorException(e); } catch (ApplicationSettingsException e) { logger.error(e.getMessage(), e); throw new OrchestratorException(e); @@ -167,7 +155,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { try { if (zk == null || !zk.getState().isConnected()) { String zkhostPort = AiravataZKUtils.getZKhostPort(); - zk = new ZooKeeper(zkhostPort, 6000, this); + zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); synchronized (mutex) { mutex.wait(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java index b855de2..5a1be5a 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java @@ -78,7 +78,7 @@ public class GFACRPCJobSubmitter implements JobSubmitter, Watcher { try { if (zk == null || !zk.getState().isConnected()) { String zkhostPort = AiravataZKUtils.getZKhostPort(); - zk = new ZooKeeper(zkhostPort, 6000, this); + zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); synchronized (mutex) { mutex.wait(); } @@ -146,7 +146,7 @@ public class GFACRPCJobSubmitter implements JobSubmitter, Watcher { try { if (zk == null || !zk.getState().isConnected()) { String zkhostPort = AiravataZKUtils.getZKhostPort(); - zk = new ZooKeeper(zkhostPort, 6000, this); + zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); synchronized (mutex) { mutex.wait(); }
