Repository: airavata Updated Branches: refs/heads/master 8a6b891d3 -> e76851d04
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java index ad9e62a..a78d3f0 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java @@ -20,13 +20,13 @@ */ package org.apache.airavata.gfac.impl; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; import org.apache.airavata.registry.cpi.AppCatalog; import org.apache.airavata.registry.cpi.AppCatalogException; import org.apache.airavata.common.utils.AiravataZKUtils; -import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.gfac.core.Constants; +import org.apache.airavata.gfac.core.GFacConstants; import org.apache.airavata.gfac.core.GFacConfiguration; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.Scheduler; @@ -98,7 +98,7 @@ public class BetterGfacImpl implements GFac { private static String ERROR_SENT = "ErrorSent"; private ExperimentCatalog experimentCatalog; private CuratorFramework curatorClient; - private MonitorPublisher monitorPublisher; + private LocalEventPublisher localEventPublisher; private static GFac gfacInstance; private boolean initialized = false; @@ -117,11 +117,10 @@ public class BetterGfacImpl implements GFac { return gfacInstance; } - @Override public boolean init(ExperimentCatalog experimentCatalog, AppCatalog appCatalog, CuratorFramework curatorClient, - MonitorPublisher publisher) { + LocalEventPublisher publisher) { this.experimentCatalog = experimentCatalog; - monitorPublisher = publisher; // This is a EventBus common for gfac + localEventPublisher = publisher; // This is a EventBus common for gfac this.curatorClient = curatorClient; return initialized = true; } @@ -153,13 +152,13 @@ public class BetterGfacImpl implements GFac { // FIXME: Here we need to update Experiment status to Failed, as we used chained update approach updating // task status will cause to update Experiment status. Remove this chained update approach and fix this correctly (update experiment status) if (jobExecutionContext != null) { - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), jobExecutionContext.getGatewayID()); TaskStatusChangeRequestEvent event = new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity); - monitorPublisher.publish(event); + localEventPublisher.publish(event); } throw new GFacException(e); } @@ -250,10 +249,10 @@ public class BetterGfacImpl implements GFac { List<InputDataObjectType> taskInputs = taskData.getApplicationInputs(); jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getInputParamMap(taskInputs))); - jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID); + jobExecutionContext.setProperty(GFacConstants.PROP_TOPIC, experimentID); jobExecutionContext.setGfac(gfacInstance); jobExecutionContext.setCuratorClient(curatorClient); - jobExecutionContext.setMonitorPublisher(monitorPublisher); + jobExecutionContext.setLocalEventPublisher(localEventPublisher); // handle job submission protocol List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces(); @@ -431,7 +430,7 @@ public class BetterGfacImpl implements GFac { // Register log event listener. This is required in all scenarios. if (isNewJob(gfacExpState)) { // In this scenario We do everything from the beginning - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status launch(jobExecutionContext); } else if (isCompletedJob(gfacExpState)) { @@ -492,7 +491,7 @@ public class BetterGfacImpl implements GFac { try { GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment String workflowInstanceID = null; - if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) { + if ((workflowInstanceID = (String) jobExecutionContext.getProperty(GFacConstants.PROP_WORKFLOW_INSTANCE_ID)) != null) { //todo implement WorkflowTrackingListener properly } if (gfacExpState == GfacExperimentState.PROVIDERINVOKING || gfacExpState == GfacExperimentState.JOBSUBMITTED @@ -503,7 +502,7 @@ public class BetterGfacImpl implements GFac { invokeProviderCancel(jobExecutionContext); } catch (GFacException e) { // we make the experiment as failed due to exception scenario - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); jobExecutionContext.setProperty(ERROR_SENT, "true"); throw new GFacException(e.getMessage(), e); } @@ -578,13 +577,13 @@ public class BetterGfacImpl implements GFac { log.error(e.getMessage(), e); try { // we make the experiment as failed due to exception scenario - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); JobIdentifier jobIdentity = new JobIdentifier( jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), jobExecutionContext.getGatewayID()); - monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity)); + localEventPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity)); GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); } catch (NullPointerException e1) { log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " @@ -593,7 +592,7 @@ public class BetterGfacImpl implements GFac { jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), jobExecutionContext.getGatewayID()); - monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity)); + localEventPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity)); GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); } @@ -632,7 +631,7 @@ public class BetterGfacImpl implements GFac { // avoid complexity } else { log.info("Experiment is cancelled, so launch operation is stopping immediately"); - GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED); + GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.CANCELED); return; // if the job is cancelled, status change is handled in cancel operation this thread simply has to be returned } // if (experimentID != null){ @@ -646,19 +645,19 @@ public class BetterGfacImpl implements GFac { invokeProviderExecute(jobExecutionContext); } else { log.info("Experiment is cancelled, so launch operation is stopping immediately"); - GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED); + GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.CANCELED); return; } } catch (Exception e) { try { // we make the experiment as failed due to exception scenario - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); - // monitorPublisher.publish(new + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); + // localEventPublisher.publish(new // ExperimentStatusChangedEvent(new // ExperimentIdentity(jobExecutionContext.getExperimentID()), // ExperimentState.FAILED)); // Updating the task status if there's any task associated - // monitorPublisher.publish(new TaskStatusChangeRequest( + // localEventPublisher.publish(new TaskStatusChangeRequest( // new TaskIdentity(jobExecutionContext.getExperimentID(), // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), // jobExecutionContext.getTaskData().getTaskID()), @@ -668,17 +667,17 @@ public class BetterGfacImpl implements GFac { jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), jobExecutionContext.getGatewayID()); - monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity)); + localEventPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity)); } catch (NullPointerException e1) { log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " + "NullPointerException occurred because at this point there might not have Job Created", e1, e); - //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED)); + //localEventPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED)); // Updating the task status if there's any task associated TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), jobExecutionContext.getGatewayID()); - monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity)); + localEventPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity)); } jobExecutionContext.setProperty(ERROR_SENT, "true"); @@ -689,13 +688,13 @@ public class BetterGfacImpl implements GFac { private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws Exception { GFacProvider provider = jobExecutionContext.getProvider(); if (provider != null) { - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING)); GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName()); initProvider(provider, jobExecutionContext); executeProvider(provider, jobExecutionContext); disposeProvider(provider, jobExecutionContext); GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED); - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED)); } if (GFacUtils.isSynchronousMode(jobExecutionContext)) { invokeOutFlowHandlers(jobExecutionContext); @@ -706,7 +705,7 @@ public class BetterGfacImpl implements GFac { GFacProvider provider = jobExecutionContext.getProvider(); if (provider != null) { if (submit) { - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING)); GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName()); GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName()); if (plState != null && plState == GfacHandlerState.INVOKING) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state @@ -717,11 +716,11 @@ public class BetterGfacImpl implements GFac { provider.recover(jobExecutionContext); } GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED); - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED)); } else { disposeProvider(provider, jobExecutionContext); GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED); - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED)); } } @@ -748,7 +747,7 @@ public class BetterGfacImpl implements GFac { private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws Exception { GFacProvider provider = jobExecutionContext.getProvider(); if (provider != null) { - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING)); GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName()); GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName()); if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state @@ -759,7 +758,7 @@ public class BetterGfacImpl implements GFac { provider.recover(jobExecutionContext); } GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED); - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED)); } if (GFacUtils.isSynchronousMode(jobExecutionContext)) @@ -815,7 +814,7 @@ public class BetterGfacImpl implements GFac { private void invokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException { List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers(); try { - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.INHANDLERSINVOKING)); for (GFacHandlerConfig handlerClassName : handlers) { if (!isCancelling(jobExecutionContext)) { @@ -842,11 +841,11 @@ public class BetterGfacImpl implements GFac { } } else { log.info("Experiment execution is cancelled, so InHandler invocation is going to stop"); - GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED); + GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.CANCELED); break; } } - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.INHANDLERSINVOKED)); } catch (Exception e) { throw new GFacException("Error Invoking Handlers:" + e.getMessage(), e); @@ -879,7 +878,7 @@ public class BetterGfacImpl implements GFac { } } try { - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING)); for (GFacHandlerConfig handlerClassName : handlers) { if (!isCancel(jobExecutionContext)) { Class<? extends GFacHandler> handlerClass; @@ -900,7 +899,7 @@ public class BetterGfacImpl implements GFac { handler.invoke(jobExecutionContext); GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED); } catch (Exception e) { - GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.FAILED); + GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.FAILED); try { StringWriter errors = new StringWriter(); e.printStackTrace(new PrintWriter(errors)); @@ -913,12 +912,12 @@ public class BetterGfacImpl implements GFac { } else { log.info("Experiment execution is cancelled, so OutHandler invocation is stopped"); if (isCancelling(jobExecutionContext)) { - GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED); + GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.CANCELED); } break; } } - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED)); } catch (Exception e) { throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e); } @@ -928,7 +927,7 @@ public class BetterGfacImpl implements GFac { // At this point all the execution is finished so we update the task and experiment statuses. // Handler authors does not have to worry about updating experiment or task statuses. -// monitorPublisher.publish(new +// localEventPublisher.publish(new // ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), // ExperimentState.COMPLETED)); // Updating the task status if there's any task associated @@ -936,8 +935,8 @@ public class BetterGfacImpl implements GFac { jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), jobExecutionContext.getGatewayID()); - monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity)); - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED)); + localEventPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED)); } @@ -952,7 +951,7 @@ public class BetterGfacImpl implements GFac { private void reInvokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException { List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers(); try { - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.INHANDLERSINVOKING)); for (GFacHandlerConfig handlerClassName : handlers) { Class<? extends GFacHandler> handlerClass; @@ -982,7 +981,7 @@ public class BetterGfacImpl implements GFac { throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); } } - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.INHANDLERSINVOKED)); } catch (Exception e) { try { @@ -1016,7 +1015,7 @@ public class BetterGfacImpl implements GFac { } launch(jobExecutionContext); } - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING)); for (GFacHandlerConfig handlerClassName : handlers) { Class<? extends GFacHandler> handlerClass; GFacHandler handler; @@ -1076,11 +1075,11 @@ public class BetterGfacImpl implements GFac { throw new GFacException("Error Executing a OutFlow Handler", e); } } - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED)); // At this point all the execution is finished so we update the task and experiment statuses. // Handler authors does not have to worry about updating experiment or task statuses. -// monitorPublisher.publish(new +// localEventPublisher.publish(new // ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), // ExperimentState.COMPLETED)); // Updating the task status if there's any task associated @@ -1089,8 +1088,8 @@ public class BetterGfacImpl implements GFac { jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), jobExecutionContext.getGatewayID()); - monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity)); - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED)); + localEventPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity)); + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED)); } private boolean isCancelled(JobExecutionContext executionContext) { http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java new file mode 100644 index 0000000..827ab55 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java @@ -0,0 +1,28 @@ +package org.apache.airavata.gfac.impl; + +import org.apache.airavata.gfac.core.GFac; +import org.apache.airavata.gfac.core.GFacException; +import org.apache.airavata.gfac.core.context.ProcessContext; + +public class GFacImpl implements GFac { + + @Override + public boolean submitProcess(ProcessContext processContext) throws GFacException { + return false; + } + + @Override + public void invokeProcessOutFlow(ProcessContext processContext) throws GFacException { + + } + + @Override + public void reInvokeProcessOutFlow(ProcessContext processContext) throws GFacException { + + } + + @Override + public boolean cancelProcess(ProcessContext processContext) throws GFacException { + return false; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java index 048889a..b682007 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java @@ -20,7 +20,7 @@ */ package org.apache.airavata.gfac.impl; -import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.GFac; @@ -44,20 +44,20 @@ public class OutHandlerWorker implements Runnable { private MonitorID monitorID; - private MonitorPublisher monitorPublisher; + private LocalEventPublisher localEventPublisher; private JobExecutionContext jEC; - public OutHandlerWorker(GFac gfac, MonitorID monitorID,MonitorPublisher monitorPublisher) { + public OutHandlerWorker(GFac gfac, MonitorID monitorID,LocalEventPublisher localEventPublisher) { this.gfac = gfac; this.monitorID = monitorID; - this.monitorPublisher = monitorPublisher; + this.localEventPublisher = localEventPublisher; this.jEC = monitorID.getJobExecutionContext(); } public OutHandlerWorker(JobExecutionContext jEC) { this.jEC = jEC; this.gfac = jEC.getGfac(); - this.monitorPublisher = jEC.getMonitorPublisher(); + this.localEventPublisher = jEC.getLocalEventPublisher(); } @Override @@ -69,7 +69,7 @@ public class OutHandlerWorker implements Runnable { logger.error(e.getMessage(),e); TaskIdentifier taskIdentifier = new TaskIdentifier(monitorID.getTaskID(), monitorID.getWorkflowNodeID(),monitorID.getExperimentID(), monitorID.getJobExecutionContext().getGatewayID()); //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status - monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier)); + localEventPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier)); try { StringWriter errors = new StringWriter(); e.printStackTrace(new PrintWriter(errors)); @@ -81,8 +81,8 @@ public class OutHandlerWorker implements Runnable { // Save error details to registry } -// monitorPublisher.publish(monitorID.getStatus()); - monitorPublisher.publish(jEC.getJobDetails().getJobStatus()); +// localEventPublisher.publish(monitorID.getStatus()); + localEventPublisher.publish(jEC.getJobDetails().getJobStatus()); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java index a0ace45..5babd92 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java @@ -29,7 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import org.apache.airavata.gfac.core.Constants; +import org.apache.airavata.gfac.core.GFacConstants; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.provider.AbstractProvider; @@ -112,8 +112,8 @@ public class LocalProvider extends AbstractProvider { initProcessBuilder(jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription()); // extra environment variables - builder.environment().put(Constants.INPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getInputDir()); - builder.environment().put(Constants.OUTPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getOutputDir()); + builder.environment().put(GFacConstants.INPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getInputDir()); + builder.environment().put(GFacConstants.OUTPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getOutputDir()); // set working directory builder.directory(new File(jobExecutionContext.getWorkingDir())); @@ -178,7 +178,7 @@ public class LocalProvider extends AbstractProvider { jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), jobExecutionContext.getGatewayID()); - jobExecutionContext.getMonitorPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity)); + jobExecutionContext.getLocalEventPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity)); } catch (IOException io) { throw new GFacProviderException(io.getMessage(), io); } catch (InterruptedException e) { @@ -234,7 +234,7 @@ public class LocalProvider extends AbstractProvider { jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), jobExecutionContext.getGatewayID()); - jobExecutionContext.getMonitorPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity)); + jobExecutionContext.getLocalEventPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity)); } catch (XmlException e) { throw new GFacProviderException("Cannot read output:" + e.getMessage(), e); } catch (IOException io) { http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java index b4ac3a9..72ffad6 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java @@ -20,7 +20,6 @@ */ package org.apache.airavata.gfac.monitor.core; -import org.apache.airavata.common.utils.MonitorPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,7 +27,7 @@ import org.slf4j.LoggerFactory; * This is the abstract Monitor which needs to be used by * any Monitoring implementation which expect nto consume * to store the status to registry. Because they have to - * use the MonitorPublisher to publish the monitoring statuses + * use the LocalEventPublisher to publish the monitoring statuses * to the Event Bus. All the Monitor statuses publish to the eventbus * will be saved to the Registry. */ http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java index 992317d..2c6b69b 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java @@ -323,7 +323,7 @@ public class EmailBasedMonitor implements Runnable{ "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(), jobStatus.getJobIdentity().getTaskId()); - jobExecutionContext.getMonitorPublisher().publish(jobStatus); + jobExecutionContext.getLocalEventPublisher().publish(jobStatus); } private void writeEnvelopeOnError(Message m) throws MessagingException { http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java index 58c0946..a7e5b90 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java @@ -75,7 +75,7 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { super.invoke(jobExecutionContext); hpcPullMonitor.setGfac(jobExecutionContext.getGfac()); - hpcPullMonitor.setPublisher(jobExecutionContext.getMonitorPublisher()); + hpcPullMonitor.setPublisher(jobExecutionContext.getLocalEventPublisher()); MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext); try { /* ZooKeeper zk = jobExecutionContext.getZk(); http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 54dd8e3..d9e815b 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -23,7 +23,7 @@ package org.apache.airavata.gfac.monitor.impl.pull.qstat; import com.google.common.eventbus.EventBus; import org.apache.airavata.common.logger.AiravataLogger; import org.apache.airavata.common.logger.AiravataLoggerFactory; -import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; import org.apache.airavata.gfac.core.SSHApiException; @@ -64,7 +64,7 @@ public class HPCPullMonitor extends PullMonitor { private Map<String, ResourceConnection> connections; - private MonitorPublisher publisher; + private LocalEventPublisher publisher; private LinkedBlockingQueue<String> cancelJobList; @@ -79,17 +79,17 @@ public class HPCPullMonitor extends PullMonitor { public HPCPullMonitor() { connections = new HashMap<String, ResourceConnection>(); queue = new LinkedBlockingDeque<UserMonitorData>(); - publisher = new MonitorPublisher(new EventBus()); + publisher = new LocalEventPublisher(new EventBus()); cancelJobList = new LinkedBlockingQueue<String>(); completedJobsFromPush = new ArrayList<String>(); (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen(); removeList = new ArrayList<MonitorID>(); } - public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) { + public HPCPullMonitor(LocalEventPublisher localEventPublisher, AuthenticationInfo authInfo) { connections = new HashMap<String, ResourceConnection>(); queue = new LinkedBlockingDeque<UserMonitorData>(); - publisher = monitorPublisher; + publisher = localEventPublisher; authenticationInfo = authInfo; cancelJobList = new LinkedBlockingQueue<String>(); this.completedJobsFromPush = new ArrayList<String>(); @@ -97,7 +97,7 @@ public class HPCPullMonitor extends PullMonitor { removeList = new ArrayList<MonitorID>(); } - public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) { + public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, LocalEventPublisher publisher) { this.queue = queue; this.publisher = publisher; connections = new HashMap<String, ResourceConnection>(); @@ -396,11 +396,11 @@ public class HPCPullMonitor extends PullMonitor { return true; } - public MonitorPublisher getPublisher() { + public LocalEventPublisher getPublisher() { return publisher; } - public void setPublisher(MonitorPublisher publisher) { + public void setPublisher(LocalEventPublisher publisher) { this.publisher = publisher; } http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java index de8cd8c..0d52f95 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java @@ -28,7 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; -import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.gfac.core.monitor.MonitorID; import org.apache.airavata.gfac.monitor.core.PushMonitor; @@ -62,9 +62,9 @@ public class AMQPMonitor extends PushMonitor { */ private Map<String, Channel> availableChannels; - private MonitorPublisher publisher; + private LocalEventPublisher publisher; - private MonitorPublisher localPublisher; + private LocalEventPublisher localPublisher; private BlockingQueue<MonitorID> runningQueue; @@ -81,7 +81,7 @@ public class AMQPMonitor extends PushMonitor { public AMQPMonitor(){ } - public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<MonitorID> runningQueue, + public AMQPMonitor(LocalEventPublisher publisher, BlockingQueue<MonitorID> runningQueue, BlockingQueue<MonitorID> finishQueue, String proxyPath,String connectionName,List<String> hosts) { this.publisher = publisher; @@ -91,7 +91,7 @@ public class AMQPMonitor extends PushMonitor { this.connectionName = connectionName; this.proxyPath = proxyPath; this.amqpHosts = hosts; - this.localPublisher = new MonitorPublisher(new EventBus()); + this.localPublisher = new LocalEventPublisher(new EventBus()); this.localPublisher.registerListener(this); } @@ -100,7 +100,7 @@ public class AMQPMonitor extends PushMonitor { this.connectionName = connectionName; this.proxyPath = proxyPath; this.amqpHosts = hosts; - this.localPublisher = new MonitorPublisher(new EventBus()); + this.localPublisher = new LocalEventPublisher(new EventBus()); this.localPublisher.registerListener(this); } @@ -230,11 +230,11 @@ public class AMQPMonitor extends PushMonitor { this.availableChannels = availableChannels; } - public MonitorPublisher getPublisher() { + public LocalEventPublisher getPublisher() { return publisher; } - public void setPublisher(MonitorPublisher publisher) { + public void setPublisher(LocalEventPublisher publisher) { this.publisher = publisher; } http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java index bd5c625..4247524 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java @@ -20,7 +20,7 @@ */ package org.apache.airavata.gfac.monitor.impl.push.amqp; -import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.gfac.core.monitor.MonitorID; import org.apache.airavata.gfac.monitor.core.MessageParser; import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; @@ -37,9 +37,9 @@ public class BasicConsumer implements Consumer { private MessageParser parser; - private MonitorPublisher publisher; + private LocalEventPublisher publisher; - public BasicConsumer(MessageParser parser, MonitorPublisher publisher) { + public BasicConsumer(MessageParser parser, LocalEventPublisher publisher) { this.parser = parser; this.publisher = publisher; } http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java index a131557..3980dac 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java @@ -21,7 +21,7 @@ package org.apache.airavata.gfac.ssh.handler; import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.gfac.core.Constants; +import org.apache.airavata.gfac.core.GFacConstants; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.cluster.RemoteCluster; import org.apache.airavata.gfac.core.context.JobExecutionContext; @@ -92,7 +92,7 @@ public class SSHOutputHandler extends AbstractHandler { String timeStampedExperimentID = GFacUtils.createUniqueNameWithDate(jobExecutionContext.getExperimentID()); TaskDetails taskData = jobExecutionContext.getTaskData(); - String outputDataDir = ServerSettings.getSetting(Constants.OUTPUT_DATA_DIR, File.separator + "tmp"); + String outputDataDir = ServerSettings.getSetting(GFacConstants.OUTPUT_DATA_DIR, File.separator + "tmp"); File localStdOutFile; File localStdErrFile; //FIXME: AdvancedOutput is remote location and third party transfer should work to make this work http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java index 38de3ba..988c604 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java @@ -26,8 +26,8 @@ import org.apache.airavata.model.workspace.experiment.TaskState; import org.apache.airavata.registry.cpi.AppCatalogException; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.MonitorPublisher; -import org.apache.airavata.gfac.core.Constants; +import org.apache.airavata.common.utils.LocalEventPublisher; +import org.apache.airavata.gfac.core.GFacConstants; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.JobDescriptor; import org.apache.airavata.gfac.core.SSHApiException; @@ -95,7 +95,7 @@ public class SSHProvider extends AbstractProvider { jobID = "SSH_" + jobExecutionContext.getHostName() + "_" + Calendar.getInstance().getTimeInMillis(); remoteCluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getRemoteCluster(); - String remoteFile = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME; + String remoteFile = jobExecutionContext.getWorkingDir() + File.separatorChar + GFacConstants.EXECUTABLE_NAME; details.setJobID(taskID); details.setJobDescription(remoteFile); jobExecutionContext.setJobDetails(details); @@ -125,7 +125,7 @@ public class SSHProvider extends AbstractProvider { /* * Execute */ - String executable = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME; + String executable = jobExecutionContext.getWorkingDir() + File.separatorChar + GFacConstants.EXECUTABLE_NAME; details.setJobDescription(executable); RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + executable + "; " + executable); StandardOutReader jobIDReaderCommandOutput = new StandardOutReader(); @@ -141,7 +141,7 @@ public class SSHProvider extends AbstractProvider { StringBuffer data = new StringBuffer(); JobDetails jobDetails = new JobDetails(); String hostAddress = jobExecutionContext.getHostName(); - MonitorPublisher monitorPublisher = jobExecutionContext.getMonitorPublisher(); + LocalEventPublisher localEventPublisher = jobExecutionContext.getLocalEventPublisher(); try { RemoteCluster remoteCluster = null; if (jobExecutionContext.getSecurityContext(hostAddress) == null) { @@ -162,11 +162,11 @@ public class SSHProvider extends AbstractProvider { if (jobID != null && !jobID.isEmpty()) { jobDetails.setJobID(jobID); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED); - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.JOBSUBMITTED)); jobExecutionContext.setJobDetails(jobDetails); if (verifyJobSubmissionByJobId(remoteCluster, jobID)) { - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.JOBSUBMITTED)); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED); } @@ -179,7 +179,7 @@ public class SSHProvider extends AbstractProvider { // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED jobID = verifyJobId; jobDetails.setJobID(jobID); - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.JOBSUBMITTED)); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED); break; @@ -193,7 +193,7 @@ public class SSHProvider extends AbstractProvider { + jobDetails.getJobName() + ", both submit and verify steps doesn't return a valid JobId. Hence changing experiment state to Failed"; log.error(msg); GFacUtils.saveErrorDetails(jobExecutionContext, msg, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); - GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.FAILED); + GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.FAILED); return; } data.append("jobDesc=").append(jobDescriptor.toXML()); @@ -303,8 +303,8 @@ public class SSHProvider extends AbstractProvider { out.write("#!/bin/bash\n".getBytes()); out.write(("cd " + jobExecutionContext.getWorkingDir() + "\n").getBytes()); - out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getInputDir() + "\n").getBytes()); - out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getOutputDir() + "\n") + out.write(("export " + GFacConstants.INPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getInputDir() + "\n").getBytes()); + out.write(("export " + GFacConstants.OUTPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getOutputDir() + "\n") .getBytes()); // get the env of the host and the application List<SetEnvPaths> envPathList = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getSetEnvironment(); http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java index 049af7f..3fb97dc 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java @@ -26,7 +26,7 @@ import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.credential.store.credential.Credential; import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential; import org.apache.airavata.credential.store.store.CredentialReader; -import org.apache.airavata.gfac.core.Constants; +import org.apache.airavata.gfac.core.GFacConstants; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.RequestData; import org.apache.airavata.gfac.core.GFacUtils; @@ -151,11 +151,11 @@ public class TokenizedSSHAuthInfo implements SSHPublicKeyFileAuthentication { */ public SSHCredential getDefaultCredentials() throws GFacException, ApplicationSettingsException, IOException { Properties configurationProperties = ServerSettings.getProperties(); - String sshUserName = configurationProperties.getProperty(Constants.SSH_USER_NAME); + String sshUserName = configurationProperties.getProperty(GFacConstants.SSH_USER_NAME); this.getRequestData().setRequestUser(sshUserName); - this.privateKeyFile = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY); - this.publicKeyFile = configurationProperties.getProperty(Constants.SSH_PUBLIC_KEY); - this.passPhrase = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY_PASS); + this.privateKeyFile = configurationProperties.getProperty(GFacConstants.SSH_PRIVATE_KEY); + this.publicKeyFile = configurationProperties.getProperty(GFacConstants.SSH_PUBLIC_KEY); + this.passPhrase = configurationProperties.getProperty(GFacConstants.SSH_PRIVATE_KEY_PASS); this.getRequestData().setRequestUser(sshUserName); return new SSHCredential(IOUtil.readToByteArray(new File(this.privateKeyFile)), IOUtil.readToByteArray(new File(this.publicKeyFile)), this.passPhrase, requestData.getGatewayId(), sshUserName); } http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java index 69c7df4..ce80232 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java @@ -27,7 +27,7 @@ import org.apache.airavata.registry.cpi.AppCatalogException; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential; -import org.apache.airavata.gfac.core.Constants; +import org.apache.airavata.gfac.core.GFacConstants; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.RequestData; import org.apache.airavata.gfac.core.JobDescriptor; @@ -111,7 +111,7 @@ public class GFACSSHUtils { if(credentials.getPrivateKey()==null || credentials.getPublicKey()==null){ // now we fall back to username password authentication Properties configurationProperties = ServerSettings.getProperties(); - tokenizedSSHAuthInfo = new DefaultPasswordAuthenticationInfo(configurationProperties.getProperty(Constants.SSH_PASSWORD)); + tokenizedSSHAuthInfo = new DefaultPasswordAuthenticationInfo(configurationProperties.getProperty(GFacConstants.SSH_PASSWORD)); } // This should be the login user name from compute resource preference String loginUser = jobExecutionContext.getLoginUserName(); http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java index 098b966..c63942d 100644 --- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java +++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java @@ -25,7 +25,7 @@ //import java.util.ArrayList; //import java.util.List; // -//import org.apache.airavata.common.utils.MonitorPublisher; +//import org.apache.airavata.common.utils.LocalEventPublisher; //import org.apache.airavata.commons.gfac.type.ActualParameter; //import org.apache.airavata.commons.gfac.type.ApplicationDescription; //import org.apache.airavata.commons.gfac.type.HostDescription; @@ -176,7 +176,7 @@ // LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler(); // localDirectorySetupHandler.invoke(jobExecutionContext); // LocalProvider localProvider = new LocalProvider(); -// localProvider.setMonitorPublisher(new MonitorPublisher(new EventBus())); +// localProvider.setLocalEventPublisher(new LocalEventPublisher(new EventBus())); // localProvider.initialize(jobExecutionContext); // localProvider.execute(jobExecutionContext); // localProvider.dispose(jobExecutionContext); http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java index 0ffa02e..6364940 100644 --- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java +++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java @@ -26,7 +26,7 @@ import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory import org.apache.airavata.gfac.core.cluster.RemoteCluster; import org.apache.airavata.gfac.gsi.ssh.impl.HPCRemoteCluster; import org.apache.airavata.registry.cpi.AppCatalog; -import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.gfac.core.JobDescriptor; import org.apache.airavata.gfac.core.SSHApiException; import org.apache.airavata.gfac.core.authentication.GSIAuthenticationInfo; @@ -68,7 +68,7 @@ public class AMQPMonitorTest { private String certificateLocation; private String pbsFilePath; private String workingDirectory; - private MonitorPublisher monitorPublisher; + private LocalEventPublisher localEventPublisher; private BlockingQueue<MonitorID> finishQueue; private BlockingQueue<MonitorID> pushQueue; private Thread pushThread; @@ -96,13 +96,13 @@ public class AMQPMonitorTest { throw new Exception("Need my proxy user name password to run tests."); } - monitorPublisher = new MonitorPublisher(new EventBus()); + localEventPublisher = new LocalEventPublisher(new EventBus()); pushQueue = new LinkedBlockingQueue<MonitorID>(); finishQueue = new LinkedBlockingQueue<MonitorID>(); final AMQPMonitor amqpMonitor = new - AMQPMonitor(monitorPublisher, + AMQPMonitor(localEventPublisher, pushQueue, finishQueue,proxyFilePath,"xsede", Arrays.asList("info1.dyn.teragrid.org,info2.dyn.teragrid.org".split(","))); try { @@ -195,7 +195,7 @@ public class AMQPMonitorTest { pushThread.interrupt(); } } - monitorPublisher.registerListener(new InnerClassAMQP()); + localEventPublisher.registerListener(new InnerClassAMQP()); // try { // pushThread.join(5000); // Iterator<MonitorID> iterator = pushQueue.iterator(); http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java index 70727f7..cc33a96 100644 --- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java +++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java @@ -26,7 +26,7 @@ //import java.util.concurrent.BlockingQueue; //import java.util.concurrent.LinkedBlockingQueue; // -//import org.apache.airavata.common.utils.MonitorPublisher; +//import org.apache.airavata.common.utils.LocalEventPublisher; //import org.apache.airavata.commons.gfac.type.HostDescription; //import org.apache.airavata.gfac.core.monitor.MonitorID; //import org.apache.airavata.gfac.monitor.HPCMonitorID; @@ -55,7 +55,7 @@ // private String pbsFilePath; // private String workingDirectory; // private HostDescription hostDescription; -// private MonitorPublisher monitorPublisher; +// private LocalEventPublisher monitorPublisher; // private BlockingQueue<UserMonitorData> pullQueue; // private Thread monitorThread; // @@ -76,7 +76,7 @@ // throw new Exception("Need my proxy user name password to run tests."); // } // -// monitorPublisher = new MonitorPublisher(new EventBus()); +// monitorPublisher = new LocalEventPublisher(new EventBus()); // class InnerClassQstat { // // @Subscribe http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 9e89788..382cd5c 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -21,6 +21,11 @@ package org.apache.airavata.gfac.server; import com.google.common.eventbus.EventBus; +import org.apache.airavata.common.exception.AiravataStartupException; +import org.apache.airavata.common.utils.LocalEventPublisher; +import org.apache.airavata.gfac.core.GFacConstants; +import org.apache.airavata.gfac.core.GFacWorker; +import org.apache.airavata.gfac.core.context.ProcessContext; import org.apache.airavata.registry.cpi.AppCatalog; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; @@ -28,16 +33,12 @@ import org.apache.airavata.common.logger.AiravataLogger; import org.apache.airavata.common.logger.AiravataLoggerFactory; import org.apache.airavata.common.utils.AiravataZKUtils; import org.apache.airavata.common.utils.Constants; -import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; -import org.apache.airavata.gfac.core.GFacConfiguration; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFac; -import org.apache.airavata.gfac.core.handler.GFacHandlerConfig; -import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.handler.ThreadedHandler; import org.apache.airavata.gfac.core.GFacThreadPoolExecutor; import org.apache.airavata.gfac.core.GFacUtils; @@ -63,18 +64,13 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.ZKPaths; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.Stat; -import org.xml.sax.SAXException; -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.xpath.XPathExpressionException; import java.io.File; -import java.io.IOException; -import java.net.URL; import java.util.ArrayList; import java.util.Calendar; import java.util.HashMap; @@ -84,53 +80,67 @@ import java.util.concurrent.BlockingQueue; public class GfacServerHandler implements GfacService.Iface { private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class); - private static RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer; + private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer; private static int requestCount=0; private ExperimentCatalog experimentCatalog; private AppCatalog appCatalog; - private String gatewayName; private String airavataUserName; private CuratorFramework curatorClient; - private MonitorPublisher publisher; - private String gfacServer; - private String gfacExperiments; + private LocalEventPublisher localEventPublisher; private String airavataServerHostPort; private BlockingQueue<TaskSubmitEvent> taskSubmitEvents; private static File gfacConfigFile; private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>(); private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>(); - public GfacServerHandler() throws Exception { + public GfacServerHandler() throws AiravataStartupException { try { - - // start curator client - String zkhostPort = AiravataZKUtils.getZKhostPort(); - RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5); - curatorClient = CuratorFrameworkFactory.newClient(zkhostPort, retryPolicy); - curatorClient.start(); - gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); - gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); - airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST) - + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT); - storeServerConfig(); - publisher = new MonitorPublisher(new EventBus()); + startCuratorClient(); + initZkDataStructure(); + initAMQPClient(); + localEventPublisher = new LocalEventPublisher(new EventBus()); experimentCatalog = RegistryFactory.getDefaultExpCatalog(); appCatalog = RegistryFactory.getAppCatalog(); - setGatewayProperties(); - startDaemonHandlers(); - // initializing Better Gfac Instance - BetterGfacImpl.getInstance().init(experimentCatalog, appCatalog, curatorClient, publisher); - if (ServerSettings.isGFacPassiveMode()) { - rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer(); - rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler()); - } - startStatusUpdators(experimentCatalog, curatorClient, publisher, rabbitMQTaskLaunchConsumer); - + startStatusUpdators(experimentCatalog, curatorClient, localEventPublisher, rabbitMQTaskLaunchConsumer); } catch (Exception e) { - throw new Exception("Error initialising GFAC", e); + throw new AiravataStartupException("Gfac Server Initialization error ", e); } } + private void initAMQPClient() throws AiravataException { + rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer(); + rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler()); + } + + private void startCuratorClient() throws ApplicationSettingsException { + String connectionSting = ServerSettings.getZookeeperConnection(); + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5); + curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy); + curatorClient.start(); + } + + private void initZkDataStructure() throws Exception { + /* + *|/servers + * - /gfac + * - /gfac-node0 (localhost:2181) + *|/experiments + */ + airavataServerHostPort = ServerSettings.getGfacServerHost() + ":" + ServerSettings.getGFacServerPort(); + // create PERSISTENT nodes + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacUtils.getZKGfacServersParentPath()); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacConstants.ZOOKEEPER_EXPERIMENT_NODE); + // create EPHEMERAL server name node + String gfacName = ServerSettings.getGFacServerName(); + if (curatorClient.checkExists().forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? gfacName : "/" + gfacName)) == null) { + curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) + .forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? gfacName : "/" + gfacName)); + + } + curatorClient.setData().withVersion(-1).forPath(GFacUtils.getZKGfacServersParentPath() + + (gfacName.startsWith("/") ? gfacName : "/" + gfacName), new String(airavataServerHostPort).getBytes()); + } + public static void main(String[] args) { RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = null; try { @@ -140,29 +150,6 @@ public class GfacServerHandler implements GfacService.Iface { logger.error(e.getMessage(), e); } } - private void storeServerConfig() throws Exception { - Stat stat = curatorClient.checkExists().forPath(gfacServer); - if (stat == null) { - curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) - .forPath(gfacServer, new byte[0]); - } - String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME); - String instanceNode = gfacServer + File.separator + instanceId; - stat = curatorClient.checkExists().forPath(instanceNode); - if (stat == null) { - curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(instanceNode, airavataServerHostPort.getBytes()); - curatorClient.getChildren().watched().forPath(instanceNode); - } - stat = curatorClient.checkExists().forPath(gfacExperiments); - if (stat == null) { - curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(gfacExperiments, airavataServerHostPort.getBytes()); - } - stat = curatorClient.checkExists().forPath(gfacExperiments + File.separator + instanceId); - if (stat == null) { - curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) - .forPath(gfacExperiments + File.separator + instanceId, airavataServerHostPort.getBytes()); - } - } private long ByateArrayToLong(byte[] data) { long value = 0; @@ -190,20 +177,27 @@ public class GfacServerHandler implements GfacService.Iface { * * * * * - * @param experimentId - * @param taskId - * @param gatewayId + * @param experimentId - ExperimentModel id in registry + * @param processId - processModel id in registry + * @param gatewayId - gateway Identification */ - public boolean submitJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException { + public boolean submitJob(String experimentId, String processId, String gatewayId, String tokenId) throws TException { requestCount++; logger.info("-----------------------------------------------------" + requestCount + "-----------------------------------------------------"); - logger.infoId(experimentId, "GFac Received submit job request for the Experiment: {} TaskId: {}", experimentId, taskId); + logger.infoId(experimentId, "GFac Received submit job request for the Experiment: {} process: {}", experimentId, processId); + ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId); + processContext.setAppCatalog(appCatalog); + processContext.setExperimentCatalog(experimentCatalog); + processContext.setCuratorClient(curatorClient); + processContext.setLocalEventPublisher(localEventPublisher); + + GFacWorker worker = new GFacWorker(processContext); InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(BetterGfacImpl.getInstance(), experimentId, - taskId, gatewayId, tokenId); + processId, gatewayId, tokenId); // try { // if( gfac.submitJob(experimentId, taskId, gatewayId)){ logger.debugId(experimentId, "Submitted job to the Gfac Implementation, experiment {}, task {}, gateway " + - "{}", experimentId, taskId, gatewayId); + "{}", experimentId, processId, gatewayId); GFacThreadPoolExecutor.getCachedThreadPool().execute(inputHandlerWorker); @@ -235,60 +229,14 @@ public class GfacServerHandler implements GfacService.Iface { this.experimentCatalog = experimentCatalog; } - public String getGatewayName() { - return gatewayName; - } - - public void setGatewayName(String gatewayName) { - this.gatewayName = gatewayName; - } - - public String getAiravataUserName() { - return airavataUserName; - } - - public void setAiravataUserName(String airavataUserName) { - this.airavataUserName = airavataUserName; - } - - protected void setGatewayProperties() throws ApplicationSettingsException { - setAiravataUserName(ServerSettings.getDefaultUser()); - setGatewayName(ServerSettings.getDefaultUserGateway()); - } private GFac getGfac() throws TException { GFac gFac = BetterGfacImpl.getInstance(); - gFac.init(experimentCatalog, appCatalog, curatorClient, publisher); + gFac.init(experimentCatalog, appCatalog, curatorClient, localEventPublisher); return gFac; } - public void startDaemonHandlers() { - List<GFacHandlerConfig> daemonHandlerConfig = null; - String className = null; - try { - URL resource = GfacServerHandler.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); - if (resource != null) { - gfacConfigFile = new File(resource.getPath()); - } - daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile); - for (GFacHandlerConfig handlerConfig : daemonHandlerConfig) { - className = handlerConfig.getClassName(); - Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class); - ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance(); - threadedHandler.initProperties(handlerConfig.getProperties()); - daemonHandlers.add(threadedHandler); - } - } catch (ParserConfigurationException | IOException | XPathExpressionException | ClassNotFoundException | - InstantiationException | IllegalAccessException | GFacHandlerException | SAXException e) { - logger.error("Error parsing gfac-config.xml, double check the xml configuration", e); - } - for (ThreadedHandler tHandler : daemonHandlers) { - (new Thread(tHandler)).start(); - } - } - - - public static void startStatusUpdators(ExperimentCatalog experimentCatalog, CuratorFramework curatorClient, MonitorPublisher publisher, + public static void startStatusUpdators(ExperimentCatalog experimentCatalog, CuratorFramework curatorClient, LocalEventPublisher publisher, RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) { try { @@ -337,9 +285,9 @@ public class GfacServerHandler implements GfacService.Iface { private String experimentNode; private String nodeName; - public TaskLaunchMessageHandler() { - experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); - nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME,"gfac-node0"); + public TaskLaunchMessageHandler() throws ApplicationSettingsException { + experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE; + nodeName = ServerSettings.getGFacServerName(); } public Map<String, Object> getProperties() { @@ -366,7 +314,6 @@ public class GfacServerHandler implements GfacService.Iface { status.setExperimentState(ExperimentState.EXECUTING); status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status, event.getExperimentId()); - experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); try { GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), curatorClient, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag()); http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java ---------------------------------------------------------------------- diff --git a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java index e133be3..0f929df 100644 --- a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java +++ b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java @@ -21,7 +21,7 @@ package org.apache.airavata.workflow.engine.util; -import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.common.utils.listener.AbstractActivityListener; public class ProxyMonitorPublisher implements AbstractActivityListener{ @@ -33,11 +33,11 @@ public class ProxyMonitorPublisher implements AbstractActivityListener{ setupConfigurations=configurations; } - private static MonitorPublisher getPublisher(){ + private static LocalEventPublisher getPublisher(){ if (setupConfigurations!=null) { for (Object configuration : setupConfigurations) { - if (configuration instanceof MonitorPublisher){ - return (MonitorPublisher) configuration; + if (configuration instanceof LocalEventPublisher){ + return (LocalEventPublisher) configuration; } } }
