merge changes of master - AIRAVATA-1511
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/65ad5860 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/65ad5860 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/65ad5860 Branch: refs/heads/gfac_appcatalog_int Commit: 65ad58606fdd4d4a8a1aa5c11a0bc7bdc1f6ac9c Parents: 3693892 f7de359 Author: Chathuri Wimalasena <[email protected]> Authored: Tue Nov 11 11:31:18 2014 -0500 Committer: Chathuri Wimalasena <[email protected]> Committed: Tue Nov 11 11:31:18 2014 -0500 ---------------------------------------------------------------------- .../client/samples/CreateLaunchExperiment.java | 21 +- .../tools/RegisterSampleApplications.java | 9 +- .../resources/schemas/GFacParameterTypes.xsd | 2 +- .../java/src/main/assembly/bin-assembly.xml | 280 ++++++++++--------- .../server/src/main/assembly/bin-assembly.xml | 25 +- .../gfac/bes/provider/impl/BESProvider.java | 206 ++++++++++++++ .../bes/security/UNICORESecurityContext.java | 6 +- .../gfac/bes/utils/DataTransferrer.java | 53 +++- .../airavata/gfac/bes/utils/JSDLUtils.java | 6 +- .../gfac/bes/utils/UASDataStagingProcessor.java | 73 ++--- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 61 ++-- .../airavata/gfac/monitor/util/CommonUtils.java | 7 +- .../registry/jpa/impl/ExperimentRegistry.java | 7 +- 13 files changed, 525 insertions(+), 231 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/65ad5860/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --cc airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index 2d7768b,05395e6..dbb4a0c --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@@ -56,10 -55,11 +58,10 @@@ public class CreateLaunchExperiment private static final String DEFAULT_GATEWAY = "default.registry.gateway"; private static Airavata.Client airavataClient; - private static String echoAppId = "Echo_1869465f-f002-43a9-b243-c091f63ab059"; - private static String wrfAppId = "WRF_a458df70-6808-4d5d-ae32-c49082f2a6cc"; - private static String amberAppId = "Amber_1b99f73b-a88d-44e3-b04e-4f56ba95ed6f"; + private static String echoAppId = "Echo_636b4530-6fb2-4c9e-998a-b41e648aa70f"; + private static String wrfAppId = "WRF_d41bdc86-e280-4eb6-a045-708f69a8c116"; + private static String amberAppId = "Amber_b23ee051-90d6-4892-827e-622a2f6c95ee"; - private static String localHost = "localhost"; private static String trestlesHostName = "trestles.sdsc.xsede.org"; private static String unicoreHostName = "fsd-cloud15.zam.kfa-juelich.de"; @@@ -213,12 -213,17 +215,17 @@@ input.setType(DataType.STRING); input.setValue("Echoed_Output=Hello World"); exInputs.add(input); - DataObjectType i2 = new DataObjectType(); - i2.setKey("Input_to_Echo1"); ++ InputDataObjectType i2 = new InputDataObjectType(); ++ i2.setName("Input_to_Echo1"); + i2.setType(DataType.URI); + i2.setValue("http://shrib.com/22QmrrX4"); + exInputs.add(i2); - List<DataObjectType> exOut = new ArrayList<DataObjectType>(); - DataObjectType output = new DataObjectType(); - output.setKey("Echoed_Output"); + List<OutputDataObjectType> exOut = new ArrayList<OutputDataObjectType>(); + OutputDataObjectType output = new OutputDataObjectType(); + output.setName("Echoed_Output"); output.setType(DataType.STRING); - output.setValue(""); + output.setValue("22QmrrX4"); exOut.add(output); http://git-wip-us.apache.org/repos/asf/airavata/blob/65ad5860/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/tools/RegisterSampleApplications.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/65ad5860/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java ---------------------------------------------------------------------- diff --cc modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java index 398f05c,044ffa2..964e6d1 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java @@@ -105,165 -101,209 +105,371 @@@ public class BESProvider extends Abstra public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { ++<<<<<<< HEAD + StorageClient sc = null; + try { + JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); + JobSubmissionProtocol protocol = preferredJobSubmissionInterface.getJobSubmissionProtocol(); + String interfaceId = preferredJobSubmissionInterface.getJobSubmissionInterfaceId(); + String factoryUrl = null; + if (protocol.equals(JobSubmissionProtocol.UNICORE)) { + UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId); + factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL(); + } + EndpointReferenceType eprt = EndpointReferenceType.Factory + .newInstance(); + eprt.addNewAddress().setStringValue(factoryUrl); + String userDN = getUserName(jobExecutionContext); + + // TODO: to be removed + if (userDN == null || userDN.equalsIgnoreCase("admin")) { + userDN = "CN=zdv575, O=Ultrascan Gateway, C=DE"; + } + CreateActivityDocument cad = CreateActivityDocument.Factory + .newInstance(); + JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory + .newInstance(); + + // create storage + StorageCreator storageCreator = new StorageCreator(secProperties, + factoryUrl, 5, null); + sc = storageCreator.createStorage(); + + JobDefinitionType jobDefinition = JSDLGenerator.buildJSDLInstance( + jobExecutionContext, sc.getUrl()).getJobDefinition(); + cad.addNewCreateActivity().addNewActivityDocument() + .setJobDefinition(jobDefinition); + log.info("JSDL" + jobDefDoc.toString()); + + // upload files if any + DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc); + dt.uploadLocalFiles(); + + JobDetails jobDetails = new JobDetails(); + FactoryClient factory = new FactoryClient(eprt, secProperties); + + log.info(String.format("Activity Submitting to %s ... \n", + factoryUrl)); + jobExecutionContext.getNotifier().publish(new StartExecutionEvent()); + CreateActivityResponseDocument response = factory.createActivity(cad); + log.info(String.format("Activity Submitted to %s \n", factoryUrl)); + + EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier(); + + log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted."); + + // factory.waitWhileActivityIsDone(activityEpr, 1000); + jobId = WSUtilities.extractResourceID(activityEpr); + if (jobId == null) { + jobId = new Long(Calendar.getInstance().getTimeInMillis()) + .toString(); + } + log.info("JobID: " + jobId); + jobDetails.setJobID(activityEpr.toString()); + jobDetails.setJobDescription(activityEpr.toString()); + + jobExecutionContext.setJobDetails(jobDetails); + log.info(formatStatusMessage(activityEpr.getAddress() + .getStringValue(), factory.getActivityStatus(activityEpr) + .toString())); + + jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId)); + GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SUBMITTED); + + factory.getActivityStatus(activityEpr); + log.info(formatStatusMessage(activityEpr.getAddress() + .getStringValue(), factory.getActivityStatus(activityEpr) + .toString())); + + // TODO publish the status messages to the message bus + while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED) + && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED) + && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)) { + + ActivityStatusType activityStatus = getStatus(factory, activityEpr); + JobState applicationJobStatus = getApplicationJobStatus(activityStatus); + String jobStatusMessage = "Status of job " + jobId + "is " + + applicationJobStatus; + GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, + applicationJobStatus); + + jobExecutionContext.getNotifier().publish( + new StatusChangeEvent(jobStatusMessage)); + + // GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId, + // applicationJobStatus); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + continue; + } + + ActivityStatusType activityStatus = null; + activityStatus = getStatus(factory, activityEpr); + log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState().toString())); + ActivityClient activityClient; + activityClient = new ActivityClient(activityEpr, secProperties); + dt.setStorageClient(activityClient.getUspaceClient()); + + if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) { + String error = activityStatus.getFault().getFaultcode() + .getLocalPart() + + "\n" + + activityStatus.getFault().getFaultstring() + + "\n EXITCODE: " + activityStatus.getExitCode(); + log.info(error); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + dt.downloadStdOuts(); + } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) { + JobState applicationJobStatus = JobState.CANCELED; + String jobStatusMessage = "Status of job " + jobId + "is " + + applicationJobStatus; + jobExecutionContext.getNotifier().publish( + new StatusChangeEvent(jobStatusMessage)); + GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, + applicationJobStatus); + throw new GFacProviderException( + jobExecutionContext.getExperimentID() + "Job Canceled"); + } else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + if (activityStatus.getExitCode() == 0) { + dt.downloadRemoteFiles(); + } else { + dt.downloadStdOuts(); + } + } + } catch (AppCatalogException e) { + log.error("Error while retrieving UNICORE job submission.."); + throw new GFacProviderException("Error while retrieving UNICORE job submission..", e); + } catch (Exception e) { + log.error("Cannot create storage.."); + throw new GFacProviderException("Cannot create storage..", e); + } finally { + // destroy sms instance + try { + if (sc != null) { + sc.destroy(); + } + } catch (Exception e) { + log.warn( + "Cannot destroy temporary SMS instance:" + sc.getUrl(), + e); + } + } + + } ++======= + UnicoreHostType host = (UnicoreHostType) jobExecutionContext + .getApplicationContext().getHostDescription().getType(); + + String factoryUrl = host.getUnicoreBESEndPointArray()[0]; + + EndpointReferenceType eprt = EndpointReferenceType.Factory + .newInstance(); + eprt.addNewAddress().setStringValue(factoryUrl); + + // WSUtilities.addServerIdentity(eprt, serverDN); + + String userDN = getUserName(jobExecutionContext); + + // TODO: to be removed + if (userDN == null || userDN.equalsIgnoreCase("admin")) { + userDN = "CN=zdv575, O=Ultrascan Gateway, C=DE"; + } + + StorageClient sc = null; + + try { + + CreateActivityDocument cad = CreateActivityDocument.Factory + .newInstance(); + JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory + .newInstance(); + + // String xlogin = getCNFromUserDN(userDN); + + // create storage + StorageCreator storageCreator = new StorageCreator(secProperties, + factoryUrl, 5, null); + + try { + sc = storageCreator.createStorage(); + } catch (Exception e2) { + log.error("Cannot create storage.."); + throw new GFacProviderException("Cannot create storage..", e2); + } + + JobDefinitionType jobDefinition = jobDefDoc.addNewJobDefinition(); + try { + jobDefinition = JSDLGenerator.buildJSDLInstance( + jobExecutionContext, sc.getUrl()).getJobDefinition(); + cad.addNewCreateActivity().addNewActivityDocument() + .setJobDefinition(jobDefinition); + log.info("JSDL" + jobDefDoc.toString()); + } catch (Exception e1) { + throw new GFacProviderException( + "Cannot generate JSDL instance from the JobExecutionContext.", + e1); + } + + // upload files if any + DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc); + dt.uploadLocalFiles(); + + FactoryClient factory = null; + JobDetails jobDetails = new JobDetails(); + + try { + factory = new FactoryClient(eprt, secProperties); + } catch (Exception e) { + throw new GFacProviderException(e.getLocalizedMessage(), e); + } + CreateActivityResponseDocument response = null; + try { + log.info(String.format("Activity Submitting to %s ... \n", + factoryUrl)); + jobExecutionContext.getNotifier().publish(new StartExecutionEvent()); + response = factory.createActivity(cad); + log.info(String.format("Activity Submitted to %s \n", factoryUrl)); + } catch (Exception e) { + throw new GFacProviderException("Cannot create activity.", e); + } + EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier(); + + log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted."); + + // factory.waitWhileActivityIsDone(activityEpr, 1000); + jobId = WSUtilities.extractResourceID(activityEpr); + if (jobId == null) { + jobId = new Long(Calendar.getInstance().getTimeInMillis()) + .toString(); + } + log.info("JobID: " + jobId); + jobDetails.setJobID(jobId); + jobDetails.setJobDescription(jobId); + + jobExecutionContext.setJobDetails(jobDetails); + try { + log.info(formatStatusMessage(activityEpr.getAddress() + .getStringValue(), factory.getActivityStatus(activityEpr) + .toString())); + + jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId)); + // GFacUtils.saveJobStatus(jobExecutionContext, details,JobState.SUBMITTED); + + factory.getActivityStatus(activityEpr); + log.info(formatStatusMessage(activityEpr.getAddress() + .getStringValue(), factory.getActivityStatus(activityEpr) + .toString())); + + // TODO publish the status messages to the message bus + while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED) + && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED) + && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)) { + + ActivityStatusType activityStatus = null; + try { + activityStatus = getStatus(factory, activityEpr); + JobState applicationJobStatus = getApplicationJobStatus(activityStatus); + String jobStatusMessage = "Status of job " + jobId + "is " + + applicationJobStatus; + //TODO: properly use GFacUtils.. + // GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, applicationJobStatus); + + jobExecutionContext.getNotifier().publish( + new StatusChangeEvent(jobStatusMessage)); + + // GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId, + // applicationJobStatus); + } catch (UnknownActivityIdentifierFault e) { + throw new GFacProviderException(e.getMessage(), + e.getCause()); + } + + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + continue; + } + }catch(Exception e) { + throw new GFacProviderException(e.getMessage(), + e.getCause()); + + } + + ActivityStatusType activityStatus = null; + try { + activityStatus = getStatus(factory, activityEpr); + log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState().toString())); + ActivityClient activityClient; + activityClient = new ActivityClient(activityEpr,secProperties); + dt.setStorageClient(activityClient.getUspaceClient()); + } catch (Exception e1) { + throw new GFacProviderException(e1.getMessage(), + e1.getCause()); + } + + + + if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) { + String error = activityStatus.getFault().getFaultcode() + .getLocalPart() + + "\n" + + activityStatus.getFault().getFaultstring() + + "\n EXITCODE: " + activityStatus.getExitCode(); + log.info(error); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + dt.downloadStdOuts(); + } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) { + JobState applicationJobStatus = JobState.CANCELED; + String jobStatusMessage = "Status of job " + jobId + "is " + + applicationJobStatus; + jobExecutionContext.getNotifier().publish( + new StatusChangeEvent(jobStatusMessage)); + //TODO: properly use GFacUtils.. + // GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, applicationJobStatus); + throw new GFacProviderException( + jobExecutionContext.getExperimentID() + "Job Canceled"); + } + + else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + if (activityStatus.getExitCode() == 0) { + dt.downloadRemoteFiles(); + } else { + dt.downloadStdOuts(); + } + } + + } finally { + // destroy sms instance + try { + if (sc != null) { + sc.destroy(); + } + } catch (Exception e) { + log.warn( + "Cannot destroy temporary SMS instance:" + sc.getUrl(), + e); + } + } + + } ++>>>>>>> f7de359dcae3694912248e50a1a2fd5e30fc613e private JobState getApplicationJobStatus(ActivityStatusType activityStatus) { if (activityStatus == null) { http://git-wip-us.apache.org/repos/asf/airavata/blob/65ad5860/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/65ad5860/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --cc modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 25113fd,66cc5f7..171ca07 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@@ -156,9 -158,11 +156,10 @@@ public class HPCPullMonitor extends Pul try { take = this.queue.take(); List<HostMonitorData> hostMonitorData = take.getHostMonitorData(); - for (HostMonitorData iHostMonitorData : hostMonitorData) { + for (ListIterator<HostMonitorData> hostIterator = hostMonitorData.listIterator(); hostIterator.hasNext();) { + HostMonitorData iHostMonitorData = hostIterator.next(); - if (iHostMonitorData.getHost().getType() instanceof GsisshHostType - || iHostMonitorData.getHost().getType() instanceof SSHHostType) { - String hostName = iHostMonitorData.getHost().getType().getHostAddress(); + if (iHostMonitorData.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { + String hostName = iHostMonitorData.getComputeResourceDescription().getHostName(); ResourceConnection connection = null; if (connections.containsKey(hostName)) { if (!connections.get(hostName).isConnected()) { http://git-wip-us.apache.org/repos/asf/airavata/blob/65ad5860/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/65ad5860/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java ---------------------------------------------------------------------- diff --cc modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java index a83f5f1,8e9ae58..edbf39e --- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java +++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java @@@ -93,11 -91,12 +93,12 @@@ public class ExperimentRegistry addUserConfigData(userConfigurationData, experimentID); } - List<DataObjectType> experimentOutputs = experiment.getExperimentOutputs(); + List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs(); if (experimentOutputs != null && !experimentOutputs.isEmpty()){ - for (OutputDataObjectType output : experimentOutputs){ - output.setValue(""); - } + //TODO: short change. + // for (DataObjectType output : experimentOutputs){ + // output.setValue(""); + // } addExpOutputs(experimentOutputs, experimentID); }
