This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch helix-integration in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 42ff5f4e2fbdbb1421f686b702a5eb76918bb4d3 Author: dimuthu <[email protected]> AuthorDate: Sun Mar 4 13:21:55 2018 -0500 Implementing post workflow --- modules/helix-spectator/pom.xml | 11 +- .../airavata/helix/impl/task/AiravataTask.java | 2 +- .../helix/impl/task/OutputDataStagingTask.java | 52 ++++- .../submission/task/DefaultJobSubmissionTask.java | 3 + .../task/submission/task/JobSubmissionTask.java | 32 +++ .../helix/impl/workflow/PostWorkflowManager.java | 256 +++++++++++++++++++++ ...SimpleWorkflow.java => PreWorkflowManager.java} | 2 +- modules/job-monitor/pom.xml | 5 + .../airavata/job/monitor/EmailBasedMonitor.java | 7 +- .../monitor/kafka/JobStatusResultDeserializer.java | 34 +++ .../monitor/kafka/JobStatusResultSerializer.java | 29 +++ .../job/monitor/kafka/MessageProducer.java | 36 +++ 12 files changed, 460 insertions(+), 9 deletions(-) diff --git a/modules/helix-spectator/pom.xml b/modules/helix-spectator/pom.xml index 36fb586..213f747 100644 --- a/modules/helix-spectator/pom.xml +++ b/modules/helix-spectator/pom.xml @@ -50,6 +50,15 @@ <artifactId>groovy-templates</artifactId> <version>2.4.7</version> </dependency> - + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>1.0.0</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>job-monitor</artifactId> + <version>0.17-SNAPSHOT</version> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java index 26361d2..e15195d 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java @@ -79,7 +79,7 @@ public abstract class AiravataTask extends AbstractTask { super.init(manager, workflowName, jobName, taskName); try { appCatalog = RegistryFactory.getAppCatalog(); - experimentCatalog = RegistryFactory.getDefaultExpCatalog(); + experimentCatalog = RegistryFactory.getExperimentCatalog(getGatewayId()); processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId); this.computeResourceDescription = getAppCatalog().getComputeResource().getComputeResource(getProcessModel() diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java index d2280d0..f33523c 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java @@ -28,6 +28,7 @@ public class OutputDataStagingTask extends DataStagingTask { @Override public TaskResult onRun(TaskHelper taskHelper) { + logger.info("Starting output data staging task " + getTaskId()); try { // Get and validate data staging task model DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel(); @@ -56,14 +57,37 @@ public class OutputDataStagingTask extends DataStagingTask { String sourceFileName; try { sourceURI = new URI(dataStagingTaskModel.getSource()); - destinationURI = new URI(dataStagingTaskModel.getDestination()); + sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1, + sourceURI.getPath().length()); + + if (dataStagingTaskModel.getDestination().startsWith("dummy")) { + String inputPath = getTaskContext().getStorageFileSystemRootLocation(); + inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator); + String experimentDataDir = getProcessModel().getExperimentDataDir(); + String filePath; + if(experimentDataDir != null && !experimentDataDir.isEmpty()) { + if(!experimentDataDir.endsWith(File.separator)){ + experimentDataDir += File.separator; + } + if (experimentDataDir.startsWith(File.separator)) { + filePath = experimentDataDir + sourceFileName; + } else { + filePath = inputPath + experimentDataDir + sourceFileName; + } + } else { + filePath = inputPath + getProcessId() + File.separator + sourceFileName; + } + + destinationURI = new URI("file", getTaskContext().getStorageResourceLoginUserName(), + storageResource.getHostName(), 22, filePath, null, null); + + } else { + destinationURI = new URI(dataStagingTaskModel.getDestination()); + } if (logger.isDebugEnabled()) { logger.debug("Source file " + sourceURI.getPath() + ", destination uri " + destinationURI.getPath() + " for task " + getTaskId()); } - - sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1, - sourceURI.getPath().length()); } catch (URISyntaxException e) { throw new TaskOnFailException("Failed to obtain source URI for output data staging task " + getTaskId(), true, e); } @@ -164,6 +188,26 @@ public class OutputDataStagingTask extends DataStagingTask { } } + public URI getDestinationURIFromDummy(String hostName, String inputPath, String fileName) throws URISyntaxException { + String experimentDataDir = getProcessModel().getExperimentDataDir(); + String filePath; + if(experimentDataDir != null && !experimentDataDir.isEmpty()) { + if(!experimentDataDir.endsWith(File.separator)){ + experimentDataDir += File.separator; + } + if (experimentDataDir.startsWith(File.separator)) { + filePath = experimentDataDir + fileName; + } else { + filePath = inputPath + experimentDataDir + fileName; + } + } else { + filePath = inputPath + getProcessId() + File.separator + fileName; + } + //FIXME + return new URI("file", getTaskContext().getStorageResourceLoginUserName(), hostName, 22, filePath, null, null); + + } + @Override public void onCancel() { diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java index e21f200..a60a955 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java @@ -36,6 +36,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { @Override public TaskResult onRun(TaskHelper taskHelper) { + try { GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build(); @@ -126,6 +127,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { logger.info("Received job id " + jobId + " from compute resource"); jobModel.setJobId(jobId); saveJobModel(jobModel); + JobStatus jobStatus = new JobStatus(); jobStatus.setJobState(JobState.SUBMITTED); jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName()); @@ -139,6 +141,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); jobModel.setJobStatuses(Arrays.asList(jobStatus)); saveJobStatus(jobModel); + createMonitoringNode(jobId); } if (getComputeResourceDescription().isGatewayUsageReporting()){ diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java index ac314e9..afa2630 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java @@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task.submission.task; import org.apache.airavata.agents.api.AgentAdaptor; import org.apache.airavata.agents.api.CommandOutput; import org.apache.airavata.agents.api.JobSubmissionOutput; +import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.helix.impl.task.AiravataTask; @@ -27,9 +28,15 @@ import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.status.JobStatus; import org.apache.airavata.registry.cpi.*; import org.apache.commons.io.FileUtils; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.helix.HelixManager; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; import java.io.File; import java.security.SecureRandom; @@ -39,9 +46,34 @@ public abstract class JobSubmissionTask extends AiravataTask { private static final Logger logger = LogManager.getLogger(JobSubmissionTask.class); + private CuratorFramework curatorClient = null; + @Override public void init(HelixManager manager, String workflowName, String jobName, String taskName) { super.init(manager, workflowName, jobName, taskName); + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + try { + this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy); + this.curatorClient.start(); + } catch (ApplicationSettingsException e) { + e.printStackTrace(); + logger.error("Failed to create curator client ", e); + throw new RuntimeException(e); + } + } + + public CuratorFramework getCuratorClient() { + return curatorClient; + } + + // TODO perform exception handling + protected void createMonitoringNode(String jobId) throws Exception { + logger.info("Creating zookeeper paths for job monitoring for job id : " + jobId + ", process : " + + getProcessId() + ", gateway : " + getGatewayId()); + this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/lock", new byte[0]); + this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/gateway", getGatewayId().getBytes()); + this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/process", getProcessId().getBytes()); + this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/status", "pending".getBytes()); } ////////////////////// diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java new file mode 100644 index 0000000..25f8ec5 --- /dev/null +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java @@ -0,0 +1,256 @@ +package org.apache.airavata.helix.impl.workflow; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.helix.core.OutPort; +import org.apache.airavata.helix.impl.task.AiravataTask; +import org.apache.airavata.helix.impl.task.EnvSetupTask; +import org.apache.airavata.helix.impl.task.InputDataStagingTask; +import org.apache.airavata.helix.impl.task.OutputDataStagingTask; +import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask; +import org.apache.airavata.helix.impl.task.submission.task.JobSubmissionTask; +import org.apache.airavata.helix.workflow.WorkflowManager; +import org.apache.airavata.job.monitor.kafka.JobStatusResultDeserializer; +import org.apache.airavata.job.monitor.parser.JobStatusResult; +import org.apache.airavata.model.experiment.ExperimentModel; +import org.apache.airavata.model.process.ProcessModel; +import org.apache.airavata.model.status.JobState; +import org.apache.airavata.model.task.TaskModel; +import org.apache.airavata.model.task.TaskTypes; +import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.AppCatalog; +import org.apache.airavata.registry.cpi.ExperimentCatalog; +import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.zookeeper.data.Stat; + +import java.util.*; +import java.util.stream.Collectors; + +public class PostWorkflowManager { + + private static final Logger logger = LogManager.getLogger(PostWorkflowManager.class); + + private final String BOOTSTRAP_SERVERS = "localhost:9092"; + private final String TOPIC = "parsed-data"; + + private CuratorFramework curatorClient = null; + + private void init() throws ApplicationSettingsException { + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy); + this.curatorClient.start(); + } + + private Consumer<String, JobStatusResult> createConsumer() { + final Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "MonitoringConsumer"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JobStatusResultDeserializer.class.getName()); + // Create the consumer using props. + final Consumer<String, JobStatusResult> consumer = new KafkaConsumer<String, JobStatusResult>(props); + // Subscribe to the topic. + consumer.subscribe(Collections.singletonList(TOPIC)); + return consumer; + } + + private String getProcessIdByJobId(String jobId) throws Exception { + byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/process"); + String process = new String(processBytes); + return process; + } + + private String getGatewayByJobId(String jobId) throws Exception { + byte[] gatewayBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/gateway"); + String gateway = new String(gatewayBytes); + return gateway; + } + + private String getStatusByJobId(String jobId) throws Exception { + byte[] statusBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/status"); + String status = new String(statusBytes); + return status; + } + + private boolean hasMonitoringRegistered(String jobId) throws Exception { + Stat stat = this.curatorClient.checkExists().forPath("/monitoring/" + jobId); + return stat != null; + } + + private void process(JobStatusResult jobStatusResult) { + + if (jobStatusResult == null) { + return; + } + + try { + logger.info("Processing job result " + jobStatusResult.getJobId()); + + if (hasMonitoringRegistered(jobStatusResult.getJobId())) { + String gateway = getGatewayByJobId(jobStatusResult.getJobId()); + String processId = getProcessIdByJobId(jobStatusResult.getJobId()); + String status = getStatusByJobId(jobStatusResult.getJobId()); + + // TODO get cluster lock before that + if ("cancelled".equals(status)) { + + } else { + + if (jobStatusResult.getState() == JobState.COMPLETE) { + logger.info("Job " + jobStatusResult.getJobId() + " was completed"); + + ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(gateway); + ProcessModel processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId); + ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, processModel.getExperimentId()); + String taskDag = processModel.getTaskDag(); + List<TaskModel> taskList = processModel.getTasks(); + + String[] taskIds = taskDag.split(","); + final List<AiravataTask> allTasks = new ArrayList<>(); + + boolean jobSubmissionFound = false; + + for (String taskId : taskIds) { + Optional<TaskModel> model = taskList.stream().filter(taskModel -> taskModel.getTaskId().equals(taskId)).findFirst(); + + if (model.isPresent()) { + TaskModel taskModel = model.get(); + AiravataTask airavataTask = null; + if (taskModel.getTaskType() == TaskTypes.JOB_SUBMISSION) { + jobSubmissionFound = true; + } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) { + if (jobSubmissionFound) { + airavataTask = new OutputDataStagingTask(); + } + } + + if (airavataTask != null) { + airavataTask.setGatewayId(experimentModel.getGatewayId()); + airavataTask.setExperimentId(experimentModel.getExperimentId()); + airavataTask.setProcessId(processModel.getProcessId()); + airavataTask.setTaskId(taskModel.getTaskId()); + if (allTasks.size() > 0) { + allTasks.get(allTasks.size() - 1).setNextTask(new OutPort(airavataTask.getTaskId(), airavataTask)); + } + allTasks.add(airavataTask); + } + } + } + WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", + "wm-23", ServerSettings.getZookeeperConnection()); + + workflowManager.launchWorkflow(UUID.randomUUID().toString(), + allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true); + + } else if (jobStatusResult.getState() == JobState.CANCELED) { + logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled"); + + } else if (jobStatusResult.getState() == JobState.FAILED) { + logger.info("Job " + jobStatusResult.getJobId() + " was failed"); + + } else if (jobStatusResult.getState() == JobState.SUBMITTED) { + logger.info("Job " + jobStatusResult.getJobId() + " was submitted"); + + } + } + } else { + logger.warn("Could not find a monitoring register for job id " + jobStatusResult.getJobId()); + } + } catch (Exception e) { + logger.error("Failed to process job : " + jobStatusResult.getJobId() + ", with status : " + jobStatusResult.getState().name(), e); + } + } + + private void runConsumer() throws InterruptedException { + final Consumer<String, JobStatusResult> consumer = createConsumer(); + + final int giveUp = 100; int noRecordsCount = 0; + + while (true) { + final ConsumerRecords<String, JobStatusResult> consumerRecords = consumer.poll(1000); + + /*if (consumerRecords.count() == 0) { + noRecordsCount++; + if (noRecordsCount > giveUp) break; + else continue; + }*/ + + consumerRecords.forEach(record -> { + process(record.value()); + }); + + consumer.commitAsync(); + } + //consumer.close(); + //System.out.println("DONE"); + } + + public static void main(String[] args) throws Exception { + + PostWorkflowManager postManager = new PostWorkflowManager(); + postManager.init(); + postManager.runConsumer(); + /* + String processId = "PROCESS_5b252ad9-d630-4cf9-80e3-0c30c55d1001"; + ExperimentCatalog experimentCatalog = RegistryFactory.getDefaultExpCatalog(); + + ProcessModel processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId); + ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, processModel.getExperimentId()); + String taskDag = processModel.getTaskDag(); + List<TaskModel> taskList = processModel.getTasks(); + + String[] taskIds = taskDag.split(","); + final List<AiravataTask> allTasks = new ArrayList<>(); + + boolean jobSubmissionFound = false; + + for (String taskId : taskIds) { + Optional<TaskModel> model = taskList.stream().filter(taskModel -> taskModel.getTaskId().equals(taskId)).findFirst(); + + if (model.isPresent()) { + TaskModel taskModel = model.get(); + AiravataTask airavataTask = null; + if (taskModel.getTaskType() == TaskTypes.ENV_SETUP) { + //airavataTask = new EnvSetupTask(); + } else if (taskModel.getTaskType() == TaskTypes.JOB_SUBMISSION) { + //airavataTask = new DefaultJobSubmissionTask(); + //airavataTask.setRetryCount(1); + jobSubmissionFound = true; + } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) { + if (jobSubmissionFound) { + airavataTask = new OutputDataStagingTask(); + } else { + //airavataTask = new InputDataStagingTask(); + } + } + + if (airavataTask != null) { + airavataTask.setGatewayId(experimentModel.getGatewayId()); + airavataTask.setExperimentId(experimentModel.getExperimentId()); + airavataTask.setProcessId(processModel.getProcessId()); + airavataTask.setTaskId(taskModel.getTaskId()); + if (allTasks.size() > 0) { + allTasks.get(allTasks.size() -1).setNextTask(new OutPort(airavataTask.getTaskId(), airavataTask)); + } + allTasks.add(airavataTask); + } + } + } + + WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-22", "localhost:2199"); + workflowManager.launchWorkflow(UUID.randomUUID().toString(), allTasks.stream().map(t -> (AiravataTask)t).collect(Collectors.toList()), true); + */ + } +} diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java similarity index 99% rename from modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java rename to modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java index abd36e1..9814b01 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java @@ -23,7 +23,7 @@ import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; -public class SimpleWorkflow { +public class PreWorkflowManager { public static void main(String[] args) throws Exception { diff --git a/modules/job-monitor/pom.xml b/modules/job-monitor/pom.xml index c536a14..7a69882 100644 --- a/modules/job-monitor/pom.xml +++ b/modules/job-monitor/pom.xml @@ -33,6 +33,11 @@ <artifactId>snakeyaml</artifactId> <version>1.15</version> </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>1.0.0</version> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/EmailBasedMonitor.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/EmailBasedMonitor.java index 7b13354..e41f500 100644 --- a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/EmailBasedMonitor.java +++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/EmailBasedMonitor.java @@ -2,6 +2,7 @@ package org.apache.airavata.job.monitor; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.job.monitor.kafka.MessageProducer; import org.apache.airavata.job.monitor.parser.EmailParser; import org.apache.airavata.job.monitor.parser.JobStatusResult; import org.apache.airavata.job.monitor.parser.ResourceConfig; @@ -48,6 +49,7 @@ public class EmailBasedMonitor implements Runnable { private Map<String, Boolean> canceledJobs = new ConcurrentHashMap<>(); private Timer timer; private Map<ResourceJobManagerType, ResourceConfig> resourceConfigs = new HashMap<>(); + private MessageProducer messageProducer = new MessageProducer(); public EmailBasedMonitor() throws Exception { @@ -235,8 +237,9 @@ public class EmailBasedMonitor implements Runnable { try { JobStatusResult jobStatusResult = parse(message); log.info(jobStatusResult.getJobId() + ", " + jobStatusResult.getJobName() + ", " + jobStatusResult.getState().getValue()); - //processedMessages.add(message); - unreadMessages.add(message); + messageProducer.submitMessageToQueue(jobStatusResult); + processedMessages.add(message); + //unreadMessages.add(message); } catch (Exception e) { unreadMessages.add(message); } diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java new file mode 100644 index 0000000..c3c7877 --- /dev/null +++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java @@ -0,0 +1,34 @@ +package org.apache.airavata.job.monitor.kafka; + +import org.apache.airavata.job.monitor.parser.JobStatusResult; +import org.apache.airavata.model.status.JobState; +import org.apache.kafka.common.serialization.Deserializer; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.util.Map; + +public class JobStatusResultDeserializer implements Deserializer<JobStatusResult> { + @Override + public void configure(Map<String, ?> map, boolean b) { + + } + + @Override + public JobStatusResult deserialize(String s, byte[] bytes) { + String deserializedData = new String(bytes); + String[] parts = deserializedData.split(","); + JobStatusResult jobStatusResult = new JobStatusResult(); + jobStatusResult.setJobId(parts[0]); + jobStatusResult.setJobName(parts[1]); + jobStatusResult.setState(JobState.valueOf(parts[2])); + return jobStatusResult; + } + + @Override + public void close() { + + } +} diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java new file mode 100644 index 0000000..a0dc6ec --- /dev/null +++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java @@ -0,0 +1,29 @@ +package org.apache.airavata.job.monitor.kafka; + +import org.apache.airavata.job.monitor.parser.JobStatusResult; +import org.apache.kafka.common.serialization.Serializer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.util.Map; + +public class JobStatusResultSerializer implements Serializer<JobStatusResult> { + + @Override + public void configure(Map<String, ?> map, boolean b) { + + } + + @Override + public byte[] serialize(String s, JobStatusResult jobStatusResult) { + String serializedData = jobStatusResult.getJobId() + "," + jobStatusResult.getJobName() + "," + jobStatusResult.getState().name(); + return serializedData.getBytes(); + } + + @Override + public void close() { + + } +} diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java new file mode 100644 index 0000000..748a533 --- /dev/null +++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java @@ -0,0 +1,36 @@ +package org.apache.airavata.job.monitor.kafka; + +import org.apache.airavata.job.monitor.parser.JobStatusResult; +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class MessageProducer { + private final static String TOPIC = "parsed-data"; + private final static String BOOTSTRAP_SERVERS = "localhost:9092"; + + final Producer<String, JobStatusResult> producer; + + public MessageProducer() { + producer = createProducer(); + } + + private Producer<String, JobStatusResult> createProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + BOOTSTRAP_SERVERS); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + JobStatusResultSerializer.class.getName()); + return new KafkaProducer<String, JobStatusResult>(props); + } + + public void submitMessageToQueue(JobStatusResult jobStatusResult) throws ExecutionException, InterruptedException { + final ProducerRecord<String, JobStatusResult> record = new ProducerRecord<>(TOPIC, jobStatusResult); + RecordMetadata recordMetadata = producer.send(record).get(); + } +} -- To stop receiving notification emails like this one, please contact [email protected].
