This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch staging in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/staging by this push: new e3b77ae Platform monitor initial version e3b77ae is described below commit e3b77ae2b072c68996ac02abe90d5e33523414d8 Author: Dimuthu Wannipurage <dimuthu.wannipur...@datasprouts.com> AuthorDate: Mon Nov 19 01:59:05 2018 -0500 Platform monitor initial version --- .../helix/impl/participant/GlobalParticipant.java | 3 +- .../airavata/helix/impl/task/mock/MockTask.java | 20 +++++ modules/airavata-helix/platform-monitor/pom.xml | 21 +++++ .../helix/cluster/monitoring/ApiServerMonitor.java | 44 +++++++++ .../helix/cluster/monitoring/DbMonitor.java | 45 ++++++++++ .../helix/cluster/monitoring/ErrorNotifier.java | 17 ++++ .../cluster/monitoring/HelixControllerMonitor.java | 15 ++++ .../monitoring/HelixParticipantMonitor.java | 100 +++++++++++++++++++++ .../helix/cluster/monitoring/MainMonitor.java | 30 +++++++ .../helix/cluster/monitoring/PlatformMonitor.java | 5 ++ .../cluster/monitoring/PlatformMonitorError.java | 46 ++++++++++ .../helix/cluster/monitoring/ZookeeperMonitor.java | 44 +++++++++ .../src/main/resources/airavata-server.properties | 7 ++ .../src/main/resources/logback.xml | 96 ++++++++++++++++++++ modules/airavata-helix/pom.xml | 1 + .../airavata/helix/workflow/WorkflowOperator.java | 8 +- 16 files changed, 499 insertions(+), 3 deletions(-) diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java index a970d1e..c704a9c 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java @@ -44,7 +44,8 @@ public class GlobalParticipant extends HelixParticipant<AbstractTask> { "org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask", "org.apache.airavata.helix.impl.task.cancel.RemoteJobCancellationTask", "org.apache.airavata.helix.impl.task.cancel.CancelCompletingTask", - "org.apache.airavata.helix.impl.task.parsing.DataParsingTask" + "org.apache.airavata.helix.impl.task.parsing.DataParsingTask", + "org.apache.airavata.helix.impl.task.mock.MockTask" }; @SuppressWarnings("WeakerAccess") diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/mock/MockTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/mock/MockTask.java new file mode 100644 index 0000000..2603881 --- /dev/null +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/mock/MockTask.java @@ -0,0 +1,20 @@ +package org.apache.airavata.helix.impl.task.mock; + +import org.apache.airavata.helix.core.AbstractTask; +import org.apache.airavata.helix.task.api.TaskHelper; +import org.apache.airavata.helix.task.api.annotation.TaskDef; +import org.apache.helix.task.TaskResult; + +@TaskDef(name = "Mock Task") +public class MockTask extends AbstractTask { + + @Override + public TaskResult onRun(TaskHelper helper) { + return onSuccess("Successfully executed Mock Task"); + } + + @Override + public void onCancel() { + + } +} \ No newline at end of file diff --git a/modules/airavata-helix/platform-monitor/pom.xml b/modules/airavata-helix/platform-monitor/pom.xml new file mode 100644 index 0000000..d8ed166 --- /dev/null +++ b/modules/airavata-helix/platform-monitor/pom.xml @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>airavata-helix</artifactId> + <groupId>org.apache.airavata</groupId> + <version>0.17-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>platform-monitor</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>helix-spectator</artifactId> + <version>0.17-SNAPSHOT</version> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/ApiServerMonitor.java b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/ApiServerMonitor.java new file mode 100644 index 0000000..6c50c66 --- /dev/null +++ b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/ApiServerMonitor.java @@ -0,0 +1,44 @@ +package org.apache.airavata.helix.cluster.monitoring; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.Socket; + +public class ApiServerMonitor implements PlatformMonitor { + + private final static Logger logger = LoggerFactory.getLogger(PlatformMonitor.class); + + private String apiServerHost =ServerSettings.getSetting("api.server.host"); + private String apiServerPort = ServerSettings.getSetting("api.server.port"); + + public ApiServerMonitor() throws ApplicationSettingsException { + } + + public void monitor(ErrorNotifier notifier) { + + logger.info("Monitoring API Server started"); + Socket s = null; + + try { + s = new Socket(apiServerHost, Integer.parseInt(apiServerPort)); + } catch (IOException e) { + PlatformMonitorError monitorError = new PlatformMonitorError(); + monitorError.setError(e); + monitorError.setReason("Could not establish a connection with Api Server " + apiServerHost + ":" + apiServerPort); + monitorError.setCategory("ApiServer"); + monitorError.setCategory("AS001"); + notifier.sendNotification(monitorError); + } finally { + if(s != null) + try {s.close();} + catch(Exception ignored){} + } + + logger.info("Monitoring API Server finished"); + + } +} diff --git a/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/DbMonitor.java b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/DbMonitor.java new file mode 100644 index 0000000..b236be5 --- /dev/null +++ b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/DbMonitor.java @@ -0,0 +1,45 @@ +package org.apache.airavata.helix.cluster.monitoring; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.Socket; + +public class DbMonitor implements PlatformMonitor { + + private final static Logger logger = LoggerFactory.getLogger(DbMonitor.class); + + private String dbServerHost = ServerSettings.getSetting("database.host"); + private String dbPort = ServerSettings.getSetting("database.port"); + + public DbMonitor() throws ApplicationSettingsException { + } + + public void monitor(ErrorNotifier notifier) { + + logger.info("Monitoring Database Server started"); + + Socket s = null; + + try { + s = new Socket(dbServerHost, Integer.parseInt(dbPort)); + } catch (IOException e) { + PlatformMonitorError monitorError = new PlatformMonitorError(); + monitorError.setError(e); + monitorError.setReason("Could not establish a connection with Database " + dbServerHost + ":" + dbPort); + monitorError.setCategory("Database"); + monitorError.setCategory("DB001"); + notifier.sendNotification(monitorError); + } finally { + if(s != null) + try {s.close();} + catch(Exception ignored){} + } + + logger.info("Monitoring Database Server finished"); + + } +} diff --git a/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/ErrorNotifier.java b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/ErrorNotifier.java new file mode 100644 index 0000000..8f892bf --- /dev/null +++ b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/ErrorNotifier.java @@ -0,0 +1,17 @@ +package org.apache.airavata.helix.cluster.monitoring; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ErrorNotifier { + + private final static Logger logger = LoggerFactory.getLogger(ErrorNotifier.class); + + public void sendNotification(PlatformMonitorError monitorError) { + if (monitorError.getError() == null) { + logger.error("Monitor error " + monitorError.getReason()); + } else { + logger.error("Monitor error " + monitorError.getReason(), monitorError.getError()); + } + } +} diff --git a/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/HelixControllerMonitor.java b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/HelixControllerMonitor.java new file mode 100644 index 0000000..3f4944d --- /dev/null +++ b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/HelixControllerMonitor.java @@ -0,0 +1,15 @@ +package org.apache.airavata.helix.cluster.monitoring; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HelixControllerMonitor implements PlatformMonitor { + + private final static Logger logger = LoggerFactory.getLogger(HelixControllerMonitor.class); + + @Override + public void monitor(ErrorNotifier notifier) { + logger.info("Monitoring Controller started"); + logger.info("Monitoring Controller finished"); + } +} \ No newline at end of file diff --git a/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/HelixParticipantMonitor.java b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/HelixParticipantMonitor.java new file mode 100644 index 0000000..568db01 --- /dev/null +++ b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/HelixParticipantMonitor.java @@ -0,0 +1,100 @@ +package org.apache.airavata.helix.cluster.monitoring; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.helix.impl.task.mock.MockTask; +import org.apache.airavata.helix.workflow.WorkflowOperator; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.task.TaskState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.UUID; + +public class HelixParticipantMonitor implements PlatformMonitor { + + private final static Logger logger = LoggerFactory.getLogger(HelixParticipantMonitor.class); + + private String helixClusterName = ServerSettings.getSetting("helix.cluster.name"); + private String instanceName = ServerSettings.getSetting("helix.participant.name"); + private String zkConnectionString = ServerSettings.getZookeeperConnection(); + + public HelixParticipantMonitor() throws ApplicationSettingsException { + } + + public void monitor(ErrorNotifier notifier) { + + logger.info("Monitoring Participant started"); + + PlatformMonitorError monitorError = checkConnectivity(); + if (monitorError != null) notifier.sendNotification(monitorError); + monitorError = checkMockWorkflow(); + if (monitorError != null) notifier.sendNotification(monitorError); + + logger.info("Monitoring Participant finished"); + + } + + private PlatformMonitorError checkConnectivity() { + try { + ZkClient zkclient = null; + zkclient = new ZkClient(zkConnectionString, ZkClient.DEFAULT_SESSION_TIMEOUT, + ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer()); + ZKHelixAdmin admin = new ZKHelixAdmin(zkclient); + + InstanceConfig instanceConfig = admin.getInstanceConfig(helixClusterName, instanceName); + + String result = new String(instanceConfig.serialize(new ZNRecordSerializer())); + + int startPoint = result.indexOf("HELIX_ENABLED"); + int endPoint = result.indexOf("\n", startPoint); + String enabledStr = result.substring(startPoint, endPoint); + if (enabledStr.contains("false")) { + PlatformMonitorError monitorError = new PlatformMonitorError(); + monitorError.setReason("Helix participant " + instanceName + " is not active"); + monitorError.setCategory("Participant"); + monitorError.setCategory("P001"); + return monitorError; + } + } catch (Exception e) { + PlatformMonitorError monitorError = new PlatformMonitorError(); + monitorError.setError(e); + monitorError.setReason("Failed to fetch Helix participant " + instanceName + " information"); + monitorError.setCategory("Participant"); + monitorError.setCategory("P002"); + return monitorError; + } + return null; + } + + private PlatformMonitorError checkMockWorkflow() { + MockTask mockTask = new MockTask(); + mockTask.setTaskId("Mock-" + UUID.randomUUID().toString()); + try { + WorkflowOperator operator = new WorkflowOperator(helixClusterName, "mock-wf-operator", zkConnectionString); + String workflow = operator.launchWorkflow(UUID.randomUUID().toString(), Collections.singletonList(mockTask), true, false); + TaskState state = operator.pollForWorkflowCompletion(workflow, 120000); + if (state != TaskState.COMPLETED) { + PlatformMonitorError monitorError = new PlatformMonitorError(); + monitorError.setReason("Mock workflow failed to execute with status " + state.name() + ". " + + "Check whether Helix cluster is working properly"); + monitorError.setCategory("Participant"); + monitorError.setCategory("P003"); + return monitorError; + } + } catch (Exception e) { + PlatformMonitorError monitorError = new PlatformMonitorError(); + monitorError.setError(e); + monitorError.setReason("Failed to launch mock workflow on helix cluster " + helixClusterName + ". " + + "Check whether Helix cluster is working properly including the availability Controller and Participant"); + monitorError.setCategory("Participant"); + monitorError.setCategory("P004"); + return monitorError; + } + return null; + } +} \ No newline at end of file diff --git a/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/MainMonitor.java b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/MainMonitor.java new file mode 100644 index 0000000..f15ded9 --- /dev/null +++ b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/MainMonitor.java @@ -0,0 +1,30 @@ +package org.apache.airavata.helix.cluster.monitoring; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class MainMonitor { + + private final static Logger logger = LoggerFactory.getLogger(MainMonitor.class); + + public static void main(String args[]) throws ApplicationSettingsException { + + List<PlatformMonitor> platformMonitors = Arrays.asList(new ApiServerMonitor(), + new DbMonitor(), new HelixControllerMonitor(), + new HelixParticipantMonitor(), new ZookeeperMonitor()); + + ErrorNotifier errorNotifier = new ErrorNotifier(); + + for (PlatformMonitor monitor : platformMonitors) { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + executorService.scheduleWithFixedDelay(() -> monitor.monitor(errorNotifier), 0, 1, TimeUnit.MINUTES); + } + } +} \ No newline at end of file diff --git a/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/PlatformMonitor.java b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/PlatformMonitor.java new file mode 100644 index 0000000..ee9e1b6 --- /dev/null +++ b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/PlatformMonitor.java @@ -0,0 +1,5 @@ +package org.apache.airavata.helix.cluster.monitoring; + +public interface PlatformMonitor { + public void monitor(ErrorNotifier notifier); +} diff --git a/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/PlatformMonitorError.java b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/PlatformMonitorError.java new file mode 100644 index 0000000..076d8b9 --- /dev/null +++ b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/PlatformMonitorError.java @@ -0,0 +1,46 @@ +package org.apache.airavata.helix.cluster.monitoring; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PlatformMonitorError { + + private final static Logger logger = LoggerFactory.getLogger(PlatformMonitorError.class); + + private String reason; + private String errorCode; + private String category; + private Throwable error; + + public String getReason() { + return reason; + } + + public void setReason(String reason) { + this.reason = reason; + } + + public String getErrorCode() { + return errorCode; + } + + public void setErrorCode(String errorCode) { + this.errorCode = errorCode; + } + + public String getCategory() { + return category; + } + + public void setCategory(String category) { + this.category = category; + } + + public Throwable getError() { + return error; + } + + public void setError(Throwable error) { + this.error = error; + } +} \ No newline at end of file diff --git a/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/ZookeeperMonitor.java b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/ZookeeperMonitor.java new file mode 100644 index 0000000..4a70018 --- /dev/null +++ b/modules/airavata-helix/platform-monitor/src/main/java/org/apache/airavata/helix/cluster/monitoring/ZookeeperMonitor.java @@ -0,0 +1,44 @@ +package org.apache.airavata.helix.cluster.monitoring; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.Socket; + +public class ZookeeperMonitor implements PlatformMonitor { + + private final static Logger logger = LoggerFactory.getLogger(ZookeeperMonitor.class); + + private String zkConnection = ServerSettings.getZookeeperConnection(); + + public ZookeeperMonitor() throws ApplicationSettingsException { + } + + public void monitor(ErrorNotifier notifier) { + + logger.info("Monitoring Zookeeper started"); + + Socket s = null; + + try { + s = new Socket(zkConnection.split(":")[0], Integer.parseInt(zkConnection.split(":")[1])); + } catch (IOException e) { + PlatformMonitorError monitorError = new PlatformMonitorError(); + monitorError.setError(e); + monitorError.setReason("Could not establish a connection with Zookeeper " + zkConnection); + monitorError.setCategory("Zookeeper"); + monitorError.setCategory("ZK001"); + notifier.sendNotification(monitorError); + } finally { + if(s != null) + try {s.close();} + catch(Exception ignored){} + } + + logger.info("Monitoring Zookeeper finished"); + + } +} \ No newline at end of file diff --git a/modules/airavata-helix/platform-monitor/src/main/resources/airavata-server.properties b/modules/airavata-helix/platform-monitor/src/main/resources/airavata-server.properties new file mode 100644 index 0000000..7e4c435 --- /dev/null +++ b/modules/airavata-helix/platform-monitor/src/main/resources/airavata-server.properties @@ -0,0 +1,7 @@ +api.server.host=api.staging.scigap.org +api.server.port=9930 +database.host=db.staging.scigap.org +database.port=3306 +helix.cluster.name=AiravataDemoCluster +helix.participant.name=helixparticipant +zookeeper.server.connection=api.staging.scigap.org:2181 \ No newline at end of file diff --git a/modules/airavata-helix/platform-monitor/src/main/resources/logback.xml b/modules/airavata-helix/platform-monitor/src/main/resources/logback.xml new file mode 100644 index 0000000..071bed2 --- /dev/null +++ b/modules/airavata-helix/platform-monitor/src/main/resources/logback.xml @@ -0,0 +1,96 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern> + </encoder> + </appender> + + <appender name="LOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <File>../logs/airavata.log</File> + <Append>true</Append> + <encoder> + <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern> + </encoder> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>../logs/airavata.log.%d{yyyy-MM-dd}</fileNamePattern> + <maxHistory>30</maxHistory> + <totalSizeCap>1GB</totalSizeCap> + </rollingPolicy> + </appender> + + <appender name="KAFKA_FAILED_LOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <File>../logs/kafka-failed.log</File> + <Append>true</Append> + <encoder> + <pattern>%d [%t] %-5p %c{30} %X - %m%n</pattern> + </encoder> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>../logs/kafka-failed.log.%d{yyyy-MM-dd}</fileNamePattern> + <maxHistory>30</maxHistory> + <totalSizeCap>1GB</totalSizeCap> + </rollingPolicy> + </appender> + + + <appender name="RELAXED-KAFKA-APPENDER" class="com.github.danielwegener.logback.kafka.KafkaAppender"> + <encoder class="net.logstash.logback.encoder.LogstashEncoder"> + <pattern>%d [%t] %-5p %c{30} %X - %m%n</pattern> + </encoder> + <topic>airavata-logs</topic> + <!-- we don't care how the log messages will be partitioned --> + <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" /> + + <!-- use async delivery. the application threads are not blocked by logging --> + <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" /> + + <!-- each <producerConfig> translates to regular kafka-client config (format: key=value) --> + <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs --> + <!-- bootstrap.servers is the only mandatory producerConfig --> + <producerConfig>bootstrap.servers=192.168.99.103:9092</producerConfig> + <!-- don't wait for a broker to ack the reception of a batch. --> + <producerConfig>acks=0</producerConfig> + <!-- wait up to 1000ms and collect log messages before sending them as a batch --> + <producerConfig>linger.ms=1000</producerConfig> + <!-- even if the producer buffer runs full, do not block the application but start to drop messages --> + <producerConfig>max.block.ms=0</producerConfig> + <!-- define a client-id that you use to identify yourself against the kafka broker --> + <producerConfig>client.id=${HOSTNAME}-${CONTEXT_NAME}-logback-relaxed</producerConfig> + + <!-- there is no fallback <appender-ref>. If this appender cannot deliver, it will drop its messages. --> + <appender-ref ref="KAFKA_FAILED_LOGFILE"/> + </appender> + + <logger name="ch.qos.logback" level="WARN"/> + <logger name="org.apache.helix" level="WARN"/> + <logger name="org.apache.zookeeper" level="ERROR"/> + <logger name="org.apache.airavata" level="INFO"/> + <logger name="org.hibernate" level="ERROR"/> + <root level="INFO"> + <appender-ref ref="RELAXED-KAFKA-APPENDER"/> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="LOGFILE"/> + </root> +</configuration> \ No newline at end of file diff --git a/modules/airavata-helix/pom.xml b/modules/airavata-helix/pom.xml index 21f76f2..b86b21a 100644 --- a/modules/airavata-helix/pom.xml +++ b/modules/airavata-helix/pom.xml @@ -42,6 +42,7 @@ <module>workflow-impl</module> <module>helix-spectator</module> <module>helix-distribution</module> + <module>platform-monitor</module> </modules> </project> \ No newline at end of file diff --git a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowOperator.java b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowOperator.java index 3008bac..612fe60 100644 --- a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowOperator.java +++ b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowOperator.java @@ -115,12 +115,16 @@ public class WorkflowOperator { // if the hfac that monitors a particular workflow, got killed due to some reason, who is taking the responsibility if (monitor) { - TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(), - TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED); + TaskState taskState = pollForWorkflowCompletion(workflow.getName(), 3600000); logger.info("Workflow " + workflowName + " for process " + processId + " finished with state " + taskState.name()); } return workflowName; } + + public synchronized TaskState pollForWorkflowCompletion(String workflowName, long timeout) throws InterruptedException { + return taskDriver.pollForWorkflowState(workflowName, timeout, TaskState.COMPLETED, + TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED); + } } \ No newline at end of file