This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch helix-integration in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 71075e0d6bfcf00047fe271188eff3cf46a8982b Author: dimuthu <[email protected]> AuthorDate: Mon Mar 5 23:09:33 2018 -0500 Logging improvements --- .../helix/core/participant/HelixParticipant.java | 1 + .../airavata/helix/impl/task/AiravataTask.java | 83 +++++++++++++++------- .../airavata/helix/impl/task/CompletingTask.java | 5 +- .../airavata/helix/impl/task/EnvSetupTask.java | 5 +- .../helix/impl/task/InputDataStagingTask.java | 4 +- .../helix/impl/task/OutputDataStagingTask.java | 7 +- .../submission/task/DefaultJobSubmissionTask.java | 5 +- .../submission/task/ForkJobSubmissionTask.java | 5 +- .../submission/task/LocalJobSubmissionTask.java | 5 +- .../helix/impl/workflow/PostWorkflowManager.java | 5 +- .../src/main/resources/log4j.properties | 2 +- 11 files changed, 84 insertions(+), 43 deletions(-) diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java index 190b866..503f5ca 100644 --- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java +++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java @@ -142,6 +142,7 @@ public class HelixParticipant <T extends AbstractTask> implements Runnable { // register task model machineEngine.registerStateModelFactory("Task", new TaskStateModelFactory(zkHelixManager, getTaskFactory())); + logger.debug("Participant: " + participantName + ", registered state model factories."); zkHelixManager.connect(); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java index 03dedf3..289cfc5 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java @@ -4,6 +4,7 @@ import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.helix.core.AbstractTask; import org.apache.airavata.helix.core.OutPort; +import org.apache.airavata.helix.task.api.TaskHelper; import org.apache.airavata.helix.task.api.annotation.TaskOutPort; import org.apache.airavata.helix.task.api.annotation.TaskParam; import org.apache.airavata.messaging.core.MessageContext; @@ -22,6 +23,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.task.TaskResult; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.log4j.MDC; import java.io.PrintWriter; import java.io.StringWriter; @@ -34,7 +36,6 @@ public abstract class AiravataTask extends AbstractTask { private ExperimentCatalog experimentCatalog; private Publisher statusPublisher; private ProcessModel processModel; - private ComputeResourceDescription computeResourceDescription; private TaskContext taskContext; @@ -52,7 +53,7 @@ public abstract class AiravataTask extends AbstractTask { private OutPort nextTask; protected TaskResult onSuccess(String message) { - String successMessage = "Task " + getTaskId() + " completed." + message != null ? " Message : " + message : ""; + String successMessage = "Task " + getTaskId() + " completed." + (message != null ? " Message : " + message : ""); logger.info(successMessage); return nextTask.invoke(new TaskResult(TaskResult.Status.COMPLETED, message)); } @@ -89,14 +90,14 @@ public abstract class AiravataTask extends AbstractTask { return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, errorMessage); } - public void saveAndPublishProcessStatus(ProcessState state) { + protected void saveAndPublishProcessStatus(ProcessState state) { ProcessStatus processStatus = new ProcessStatus(state); processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); getTaskContext().setProcessStatus(processStatus); saveAndPublishProcessStatus(); } - public void saveAndPublishProcessStatus() { + protected void saveAndPublishProcessStatus() { try { ProcessStatus status = taskContext.getProcessStatus(); if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){ @@ -116,7 +117,7 @@ public abstract class AiravataTask extends AbstractTask { } } - public void saveAndPublishTaskStatus() { + protected void saveAndPublishTaskStatus() { try { TaskState state = getTaskContext().getTaskState(); // first we save job jobModel to the registry for sa and then save the job status. @@ -139,7 +140,7 @@ public abstract class AiravataTask extends AbstractTask { } } - public void saveExperimentError(ErrorModel errorModel) { + protected void saveExperimentError(ErrorModel errorModel) { try { errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR")); getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR, errorModel, experimentId); @@ -149,7 +150,7 @@ public abstract class AiravataTask extends AbstractTask { } } - public void saveProcessError(ErrorModel errorModel) { + protected void saveProcessError(ErrorModel errorModel) { try { errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR")); experimentCatalog.add(ExpCatChildDataType.PROCESS_ERROR, errorModel, getProcessId()); @@ -160,7 +161,7 @@ public abstract class AiravataTask extends AbstractTask { } } - public void saveTaskError(ErrorModel errorModel) throws Exception { + protected void saveTaskError(ErrorModel errorModel) throws Exception { try { errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR")); getExperimentCatalog().add(ExpCatChildDataType.TASK_ERROR, errorModel, getTaskId()); @@ -171,7 +172,7 @@ public abstract class AiravataTask extends AbstractTask { } } - public Publisher getStatusPublisher() throws AiravataException { + protected Publisher getStatusPublisher() throws AiravataException { if (statusPublisher == null) { synchronized (RabbitMQPublisher.class) { if (statusPublisher == null) { @@ -183,10 +184,47 @@ public abstract class AiravataTask extends AbstractTask { } @Override + public TaskResult onRun(TaskHelper helper) { + + try { + MDC.put("experiment", getExperimentId()); + MDC.put("process", getProcessId()); + MDC.put("gateway", getGatewayId()); + MDC.put("task", getTaskId()); + return onRun(helper, getTaskContext()); + } finally { + MDC.clear(); + } + } + + public abstract TaskResult onRun(TaskHelper helper, TaskContext taskContext); + + @Override + public void onCancel() { + try { + MDC.put("experiment", getExperimentId()); + MDC.put("process", getProcessId()); + MDC.put("gateway", getGatewayId()); + MDC.put("task", getTaskId()); + onCancel(getTaskContext()); + } finally { + MDC.clear(); + } + } + + public abstract void onCancel(TaskContext taskContext); + + + @Override public void init(HelixManager manager, String workflowName, String jobName, String taskName) { super.init(manager, workflowName, jobName, taskName); + MDC.put("experiment", getExperimentId()); + MDC.put("process", getProcessId()); + MDC.put("gateway", getGatewayId()); + MDC.put("task", getTaskId()); try { appCatalog = RegistryFactory.getAppCatalog(); + //logger.info("Gateway id is " + getGatewayId()); experimentCatalog = RegistryFactory.getExperimentCatalog(getGatewayId()); processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId); @@ -208,12 +246,12 @@ public abstract class AiravataTask extends AbstractTask { .getStoragePreference(gatewayId, processModel.getStorageResourceId())); this.taskContext = taskContextBuilder.build(); - } catch (AppCatalogException e) { - e.printStackTrace(); - } catch (RegistryException e) { - e.printStackTrace(); + logger.info("Task " + taskName + " intitialized"); } catch (Exception e) { - e.printStackTrace(); + logger.error("Error occurred while initializing the task " + getTaskId() + " of experiment " + getExperimentId(), e); + throw new RuntimeException("Error occurred while initializing the task " + getTaskId() + " of experiment " + getExperimentId(), e); + } finally { + MDC.clear(); } } @@ -236,20 +274,15 @@ public abstract class AiravataTask extends AbstractTask { msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); } - ////////////////////////// - - public ComputeResourceDescription getComputeResourceDescription() { + protected ComputeResourceDescription getComputeResourceDescription() { return computeResourceDescription; } - //////////////////////// - - - public TaskContext getTaskContext() { + protected TaskContext getTaskContext() { return taskContext; } - public ExperimentCatalog getExperimentCatalog() { + protected ExperimentCatalog getExperimentCatalog() { return experimentCatalog; } @@ -261,7 +294,7 @@ public abstract class AiravataTask extends AbstractTask { this.processId = processId; } - public String getExperimentId() { + protected String getExperimentId() { return experimentId; } @@ -269,7 +302,7 @@ public abstract class AiravataTask extends AbstractTask { this.experimentId = experimentId; } - public String getGatewayId() { + protected String getGatewayId() { return gatewayId; } @@ -277,7 +310,7 @@ public abstract class AiravataTask extends AbstractTask { this.gatewayId = gatewayId; } - public ProcessModel getProcessModel() { + protected ProcessModel getProcessModel() { return processModel; } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/CompletingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/CompletingTask.java index 9ec2909..d036258 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/CompletingTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/CompletingTask.java @@ -13,14 +13,15 @@ public class CompletingTask extends AiravataTask { private static final Logger logger = LogManager.getLogger(CompletingTask.class); @Override - public TaskResult onRun(TaskHelper helper) { + public TaskResult onRun(TaskHelper helper, TaskContext taskContext) { + logger.info("Starting completing task for task " + getTaskId() + ", experiment id " + getExperimentId()); logger.info("Process " + getProcessId() + " successfully completed"); saveAndPublishProcessStatus(ProcessState.COMPLETED); return onSuccess("Process " + getProcessId() + " successfully completed"); } @Override - public void onCancel() { + public void onCancel(TaskContext taskContext) { } } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java index abdc1bf..0ad5698 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java @@ -16,7 +16,7 @@ public class EnvSetupTask extends AiravataTask { private static final Logger logger = LogManager.getLogger(EnvSetupTask.class); @Override - public TaskResult onRun(TaskHelper taskHelper) { + public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) { try { saveAndPublishProcessStatus(ProcessState.CONFIGURING_WORKSPACE); @@ -32,6 +32,7 @@ public class EnvSetupTask extends AiravataTask { adaptor.createDirectory(getTaskContext().getWorkingDir()); publishTaskState(TaskState.COMPLETED); return onSuccess("Envi setup task successfully completed " + getTaskId()); + } catch (Exception e) { try { publishTaskState(TaskState.FAILED); @@ -45,7 +46,7 @@ public class EnvSetupTask extends AiravataTask { } @Override - public void onCancel() { + public void onCancel(TaskContext taskContext) { } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java index ed143dd..2c885f4 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java @@ -23,7 +23,7 @@ public class InputDataStagingTask extends DataStagingTask { private static final Logger logger = LogManager.getLogger(InputDataStagingTask.class); @Override - public TaskResult onRun(TaskHelper taskHelper) { + public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) { logger.info("Starting Input Data Staging Task " + getTaskId()); saveAndPublishProcessStatus(ProcessState.INPUT_DATA_STAGING); @@ -110,7 +110,7 @@ public class InputDataStagingTask extends DataStagingTask { } @Override - public void onCancel() { + public void onCancel(TaskContext taskContext) { } } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java index ff8fd2e..738d955 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java @@ -27,9 +27,9 @@ public class OutputDataStagingTask extends DataStagingTask { private static final Logger logger = LogManager.getLogger(OutputDataStagingTask.class); @Override - public TaskResult onRun(TaskHelper taskHelper) { + public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) { - logger.info("Starting output data staging task " + getTaskId()); + logger.info("Starting output data staging task " + getTaskId() + " in experiment " + getExperimentId()); saveAndPublishProcessStatus(ProcessState.OUTPUT_DATA_STAGING); try { @@ -51,7 +51,6 @@ public class OutputDataStagingTask extends DataStagingTask { } // Fetch and validate storage resource - // Fetch and validate storage resource StorageResourceDescription storageResource = getStorageResource(); // Fetch and validate source and destination URLS @@ -212,7 +211,7 @@ public class OutputDataStagingTask extends DataStagingTask { } @Override - public void onCancel() { + public void onCancel(TaskContext taskContext) { } } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java index 688f894..9b015bb 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java @@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task.submission.task; import org.apache.airavata.agents.api.AgentAdaptor; import org.apache.airavata.agents.api.JobSubmissionOutput; import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.helix.impl.task.TaskContext; import org.apache.airavata.helix.impl.task.submission.GroovyMapBuilder; import org.apache.airavata.helix.impl.task.submission.GroovyMapData; import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo; @@ -29,7 +30,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { public static final String DEFAULT_JOB_ID = "DEFAULT_JOB_ID"; @Override - public TaskResult onRun(TaskHelper taskHelper) { + public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) { try { saveAndPublishProcessStatus(ProcessState.EXECUTING); @@ -227,7 +228,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { } @Override - public void onCancel() { + public void onCancel(TaskContext taskContext) { } } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java index e3b5447..afce74e 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java @@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task.submission.task; import org.apache.airavata.agents.api.AgentAdaptor; import org.apache.airavata.agents.api.JobSubmissionOutput; import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.helix.impl.task.TaskContext; import org.apache.airavata.helix.impl.task.submission.GroovyMapBuilder; import org.apache.airavata.helix.impl.task.submission.GroovyMapData; import org.apache.airavata.helix.impl.task.submission.SubmissionUtil; @@ -21,7 +22,7 @@ import java.util.Arrays; public class ForkJobSubmissionTask extends JobSubmissionTask { @Override - public TaskResult onRun(TaskHelper taskHelper) { + public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) { try { GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build(); @@ -76,7 +77,7 @@ public class ForkJobSubmissionTask extends JobSubmissionTask { } @Override - public void onCancel() { + public void onCancel(TaskContext taskContext) { } } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java index cea6750..3e51b4f 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java @@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task.submission.task; import org.apache.airavata.agents.api.AgentAdaptor; import org.apache.airavata.agents.api.JobSubmissionOutput; import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.helix.impl.task.TaskContext; import org.apache.airavata.helix.impl.task.submission.GroovyMapBuilder; import org.apache.airavata.helix.impl.task.submission.GroovyMapData; import org.apache.airavata.helix.impl.task.submission.SubmissionUtil; @@ -23,7 +24,7 @@ import java.util.UUID; public class LocalJobSubmissionTask extends JobSubmissionTask { @Override - public TaskResult onRun(TaskHelper taskHelper) { + public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) { try { GroovyMapData groovyMapData = new GroovyMapData(); @@ -81,7 +82,7 @@ public class LocalJobSubmissionTask extends JobSubmissionTask { } @Override - public void onCancel() { + public void onCancel(TaskContext taskContext) { } } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java index 07a9aee..b4ffacf 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java @@ -126,7 +126,7 @@ public class PostWorkflowManager { String status = getStatusByJobId(jobStatusResult.getJobId()); logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId() + " with process id " - + processId + ", gateway " + gateway + " and status " + status); + + processId + ", gateway " + gateway + " and status " + jobStatusResult.getState().name()); // TODO get cluster lock before that if ("cancelled".equals(status)) { @@ -181,6 +181,9 @@ public class PostWorkflowManager { completingTask.setExperimentId(experimentModel.getExperimentId()); completingTask.setProcessId(processModel.getProcessId()); completingTask.setTaskId("Completing-Task"); + if (allTasks.size() > 0) { + allTasks.get(allTasks.size() - 1).setNextTask(new OutPort(completingTask.getTaskId(), completingTask)); + } allTasks.add(completingTask); WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", diff --git a/modules/helix-spectator/src/main/resources/log4j.properties b/modules/helix-spectator/src/main/resources/log4j.properties index 69a4301..dba82a1 100644 --- a/modules/helix-spectator/src/main/resources/log4j.properties +++ b/modules/helix-spectator/src/main/resources/log4j.properties @@ -10,4 +10,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. log4j.appender.A1.layout=org.apache.log4j.PatternLayout -log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] [E=%X{experiment},P=%X{process},T=%X{task},G=%X{gateway}] %-5p %c %x - %m%n \ No newline at end of file -- To stop receiving notification emails like this one, please contact [email protected].
