This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch cybershuttle-dev in repository https://gitbox.apache.org/repos/asf/airavata.git
commit d10b7e1320e129c8a247da8897535ccb2a20b7f3 Author: yasithdev <[email protected]> AuthorDate: Fri Mar 21 23:27:03 2025 +0000 fix issues with helix participant and job engine starter --- .../helix/impl/controller/HelixController.java | 60 ++++++-------- .../helix/impl/participant/GlobalParticipant.java | 5 +- .../helix/core/participant/HelixParticipant.java | 93 ++++++++++++---------- .../airavata/ide/integration/JobEngineStarter.java | 26 ++---- pom.xml | 2 +- 5 files changed, 85 insertions(+), 101 deletions(-) diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java index ee4656ea1c..be3c11fbe9 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java @@ -23,15 +23,12 @@ import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.helix.controller.HelixControllerMain; import org.apache.helix.manager.zk.ZKHelixAdmin; -import org.apache.helix.manager.zk.ZNRecordSerializer; -import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.IdealState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; /** * TODO: Class level comments please @@ -43,13 +40,13 @@ public class HelixController implements Runnable { private final static Logger logger = LoggerFactory.getLogger(HelixController.class); - private String clusterName; - private String controllerName; - private String zkAddress; + private final String clusterName; + private final String controllerName; + private final String zkAddress; private org.apache.helix.HelixManager zkHelixManager; - private CountDownLatch startLatch = new CountDownLatch(1); - private CountDownLatch stopLatch = new CountDownLatch(1); + private final CountDownLatch startLatch = new CountDownLatch(1); + private final CountDownLatch stopLatch = new CountDownLatch(1); @SuppressWarnings("WeakerAccess") public HelixController() throws ApplicationSettingsException { @@ -60,50 +57,43 @@ public class HelixController implements Runnable { public void run() { try { - ZkClient zkClient = new ZkClient(ServerSettings.getZookeeperConnection(), ZkClient.DEFAULT_SESSION_TIMEOUT, - ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer()); - ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient); - - // Creates the zk cluster if not available - if (! zkHelixAdmin.getClusters().contains(clusterName)) { - zkHelixAdmin.addCluster(clusterName, true); - } - + logger.info("Zookeeper connection string: {}", zkAddress); + logger.info("Helix cluster: {}", clusterName); + logger.info("Helix controller: {}", controllerName); + ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin.Builder().setZkAddress(this.zkAddress).build(); + logger.info("[zkAdmin] started"); + logger.info("[zkAdmin] Cluster: {} adding if not available...", clusterName); + zkHelixAdmin.addCluster(this.clusterName, true); + logger.info("[zkAdmin] Cluster: {} now available!", clusterName); zkHelixAdmin.close(); - zkClient.close(); + logger.info("[zkAdmin] closed"); - logger.info("Connection to helix cluster : " + clusterName + " with name : " + controllerName); - logger.info("Zookeeper connection string " + zkAddress); - - zkHelixManager = HelixControllerMain.startHelixController(zkAddress, clusterName, - controllerName, HelixControllerMain.STANDALONE); + logger.info("helix controller - starting..."); + zkHelixManager = HelixControllerMain.startHelixController(zkAddress, clusterName, controllerName, HelixControllerMain.STANDALONE); + logger.info("Helix controller - started"); startLatch.countDown(); stopLatch.await(); } catch (Exception ex) { - logger.error("Error in run() for Controller: " + controllerName + ", reason: " + ex, ex); + logger.error("Error in run() for Controller: {}, reason: {}", controllerName, ex, ex); } finally { disconnect(); } } + @SuppressWarnings("RedundantThrows") public void startServer() throws Exception { - //WorkflowCleanupAgent cleanupAgent = new WorkflowCleanupAgent(); - //cleanupAgent.init(); - //ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - //executor.scheduleWithFixedDelay(cleanupAgent, 10, 120, TimeUnit.SECONDS); - new Thread(this).start(); try { startLatch.await(); - logger.info("Controller: " + controllerName + ", has connected to cluster: " + clusterName); + logger.info("Controller: {}, has connected to cluster: {}", controllerName, clusterName); Runtime.getRuntime().addShutdownHook( new Thread(this::disconnect) ); } catch (InterruptedException ex) { - logger.error("Controller: " + controllerName + ", is interrupted! reason: " + ex, ex); + logger.error("Controller: {}, is interrupted! reason: {}", controllerName, ex, ex); } } @@ -114,12 +104,12 @@ public class HelixController implements Runnable { private void disconnect() { if (zkHelixManager != null) { - logger.info("Controller: " + controllerName + ", has disconnected from cluster: " + clusterName); + logger.info("Controller: {}, has disconnected from cluster: {}", controllerName, clusterName); zkHelixManager.disconnect(); } } - public static void main(String args[]) { + public static void main(String[] args) { try { logger.info("Starting helix controller"); diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java index fb439f45e7..54a657315b 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java @@ -62,18 +62,19 @@ public class GlobalParticipant extends HelixParticipant<AbstractTask> { t.start(); } + @SuppressWarnings("unused") public void stopServer() { } - public static void main(String args[]) { + public static void main(String[] args) { logger.info("Starting global participant"); try { ArrayList<Class<? extends AbstractTask>> taskClasses = new ArrayList<>(); for (String taskClassName : TASK_CLASS_NAMES) { - logger.debug("Adding task class: " + taskClassName + " to the global participant"); + logger.debug("Adding task class: {} to the global participant", taskClassName); taskClasses.add(Class.forName(taskClassName).asSubclass(AbstractTask.class)); } diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java index 378e678020..8bf040f513 100644 --- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java +++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java @@ -23,22 +23,22 @@ import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.helix.core.support.TaskHelperImpl; import org.apache.airavata.helix.core.AbstractTask; -import org.apache.airavata.helix.core.util.PropertyResolver; import org.apache.airavata.helix.task.api.annotation.TaskDef; import org.apache.helix.InstanceType; import org.apache.helix.examples.OnlineOfflineStateModelFactory; -import org.apache.helix.manager.zk.ZKHelixAdmin; -import org.apache.helix.manager.zk.ZKHelixManager; -import org.apache.helix.manager.zk.ZNRecordSerializer; -import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.InstanceConfig; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.task.TaskFactory; import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.manager.zk.ZKHelixManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.InvocationTargetException; import java.util.*; /** @@ -54,15 +54,16 @@ public class HelixParticipant<T extends AbstractTask> implements Runnable { private int shutdownGracePeriod = 30000; private int shutdownGraceRetries = 2; - private String zkAddress; - private String clusterName; - private String participantName; + private final String zkAddress; + private final String clusterName; + private final String participantName; + private final String taskTypeName; + private ZKHelixManager zkHelixManager; - private String taskTypeName; - private List<Class<? extends T>> taskClasses; - private final List<String> runningTasks = Collections.synchronizedList(new ArrayList<String>()); + private final List<Class<? extends T>> taskClasses; + private final List<String> runningTasks = Collections.synchronizedList(new ArrayList<>()); public HelixParticipant(List<Class<? extends T>> taskClasses, String taskTypeName) throws ApplicationSettingsException { @@ -75,38 +76,41 @@ public class HelixParticipant<T extends AbstractTask> implements Runnable { this.taskTypeName = taskTypeName; this.taskClasses = taskClasses; - logger.info("Zookeeper connection URL " + zkAddress); - logger.info("Cluster name " + clusterName); - logger.info("Participant name " + participantName); - logger.info("Task type " + taskTypeName); + logger.info("Zookeeper connection URL {}", zkAddress); + logger.info("Cluster name {}", clusterName); + logger.info("Participant name {}", participantName); + logger.info("Task type {}", taskTypeName); if (taskClasses != null) { for (Class<? extends T> taskClass : taskClasses) { - logger.info("Task classes include: " + taskClass.getCanonicalName()); + logger.info("Task classes include: {}", taskClass.getCanonicalName()); } } } + @SuppressWarnings({"WeakerAccess", "unused"}) public HelixParticipant(Class<T> taskClass, String taskTypeName) throws ApplicationSettingsException { this(taskClass != null ? Collections.singletonList(taskClass) : null, taskTypeName); } + @SuppressWarnings({"WeakerAccess", "unused"}) public void setShutdownGracePeriod(int shutdownGracePeriod) { this.shutdownGracePeriod = shutdownGracePeriod; } + @SuppressWarnings({"WeakerAccess", "unused"}) public void setShutdownGraceRetries(int shutdownGraceRetries) { this.shutdownGraceRetries = shutdownGraceRetries; } public void registerRunningTask(AbstractTask task) { runningTasks.add(task.getTaskId()); - logger.info("Registered Task " + task.getTaskId() + ". Currently available " + runningTasks.size()); + logger.info("Registered Task {}. Currently available {}", task.getTaskId(), runningTasks.size()); } public void unregisterRunningTask(AbstractTask task) { runningTasks.remove(task.getTaskId()); - logger.info("Un registered Task " + task.getTaskId() + ". Currently available " + runningTasks.size()); + logger.info("Un registered Task {}. Currently available {}", task.getTaskId(), runningTasks.size()); } @SuppressWarnings("WeakerAccess") @@ -116,54 +120,58 @@ public class HelixParticipant<T extends AbstractTask> implements Runnable { for (Class<? extends T> taskClass : taskClasses) { TaskFactory taskFac = context -> { try { - return AbstractTask.class.cast(taskClass.newInstance()) + return taskClass.getDeclaredConstructor().newInstance() .setParticipant(HelixParticipant.this) .setCallbackContext(context) .setTaskHelper(new TaskHelperImpl()); - } catch (InstantiationException | IllegalAccessException e) { - logger.error("Failed to initialize the task: " + context.getTaskConfig().getId(), e); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | + InvocationTargetException e) { + logger.error("Failed to initialize the task: {}", context.getTaskConfig().getId(), e); return null; } }; TaskDef taskDef = taskClass.getAnnotation(TaskDef.class); - taskRegistry.put(taskDef.name(), taskFac); + if (taskDef != null) { + taskRegistry.put(taskDef.name(), taskFac); + } else { + logger.warn("Task class {} is not annotated with @TaskDef", taskClass.getCanonicalName()); + } } return taskRegistry; } public void run() { - ZkClient zkClient = null; + ZkClient zkClient = null; try { zkClient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer()); - ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient); + ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin.Builder().setZkAddress(zkAddress).build(); List<String> nodesInCluster = zkHelixAdmin.getInstancesInCluster(clusterName); if (!nodesInCluster.contains(participantName)) { InstanceConfig instanceConfig = new InstanceConfig(participantName); instanceConfig.setHostName("localhost"); - instanceConfig.setInstanceEnabled(true); if (taskTypeName != null) { instanceConfig.addTag(taskTypeName); } zkHelixAdmin.addInstance(clusterName, instanceConfig); - logger.info("Participant: " + participantName + " has been added to cluster: " + clusterName); + logger.info("Participant: {} has been added to cluster: {}", participantName, clusterName); } else { if (taskTypeName != null) { zkHelixAdmin.addInstanceTag(clusterName, participantName, taskTypeName); } - zkHelixAdmin.enableInstance(clusterName, participantName, true); - logger.debug("Participant: " + participantName + " has been re-enabled at the cluster: " + clusterName); + zkHelixAdmin.enableResource(clusterName, participantName, true); + logger.debug("Participant: {} has been re-enabled at the cluster: {}", participantName, clusterName); } Runtime.getRuntime().addShutdownHook( new Thread(() -> { - logger.debug("Participant: " + participantName + " shutdown hook called"); + logger.debug("Participant: {} shutdown hook called", participantName); try { - zkHelixAdmin.enableInstance(clusterName, participantName, false); + zkHelixAdmin.enableResource(clusterName, participantName, false); } catch (Exception e) { - logger.warn("Participant: " + participantName + " was not disabled normally", e); + logger.warn("Participant: {} was not disabled normally", participantName, e); } disconnect(); }) @@ -172,7 +180,7 @@ public class HelixParticipant<T extends AbstractTask> implements Runnable { // connect the participant manager connect(); } catch (Exception ex) { - logger.error("Error in run() for Participant: " + participantName + ", reason: " + ex, ex); + logger.error("Error in run() for Participant: {}, reason: {}", participantName, ex, ex); } finally { if (zkClient != null) { zkClient.close(); @@ -191,38 +199,39 @@ public class HelixParticipant<T extends AbstractTask> implements Runnable { // register task model machineEngine.registerStateModelFactory("Task", new TaskStateModelFactory(zkHelixManager, getTaskFactory())); - logger.debug("Participant: " + participantName + ", registered state model factories."); + logger.debug("Participant: {}, registered state model factories.", participantName); zkHelixManager.connect(); - logger.info("Participant: " + participantName + ", has connected to cluster: " + clusterName); + logger.info("Participant: {}, has connected to cluster: {}", participantName, clusterName); Thread.currentThread().join(); } catch (InterruptedException ex) { - logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex); + logger.error("Participant: {}, is interrupted! reason: {}", participantName, ex, ex); } catch (Exception ex) { - logger.error("Error in connect() for Participant: " + participantName + ", reason: " + ex, ex); + logger.error("Error in connect() for Participant: {}, reason: {}", participantName, ex, ex); } finally { disconnect(); } } private void disconnect() { - logger.info("Shutting down participant. Currently available tasks " + runningTasks.size()); + logger.info("Shutting down participant. Currently available tasks {}", runningTasks.size()); if (zkHelixManager != null) { - if (runningTasks.size() > 0) { + if (!runningTasks.isEmpty()) { for (int i = 0; i <= shutdownGraceRetries; i++) { - logger.info("Shutting down gracefully [RETRY " + i + "]"); + logger.info("Shutting down gracefully [RETRY {}]", i); try { + //noinspection BusyWait Thread.sleep(shutdownGracePeriod); } catch (InterruptedException e) { - logger.warn("Waiting for running tasks failed [RETRY " + i + "]", e); + logger.warn("Waiting for running tasks failed [RETRY {}]", i, e); } - if (runningTasks.size() == 0) { + if (runningTasks.isEmpty()) { break; } } } - logger.info("Participant: " + participantName + ", has disconnected from cluster: " + clusterName); + logger.info("Participant: {}, has disconnected from cluster: {}", participantName, clusterName); zkHelixManager.disconnect(); } } diff --git a/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/JobEngineStarter.java b/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/JobEngineStarter.java index cc15a118af..f9f4b81ab0 100644 --- a/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/JobEngineStarter.java +++ b/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/JobEngineStarter.java @@ -1,53 +1,37 @@ package org.apache.airavata.ide.integration; -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.helix.core.AbstractTask; import org.apache.airavata.helix.impl.controller.HelixController; import org.apache.airavata.helix.impl.participant.GlobalParticipant; import org.apache.airavata.helix.impl.workflow.PostWorkflowManager; import org.apache.airavata.helix.impl.workflow.PreWorkflowManager; -import org.apache.airavata.monitor.email.EmailBasedMonitor; -import org.apache.helix.manager.zk.ZKHelixAdmin; -import org.apache.helix.manager.zk.ZNRecordSerializer; -import org.apache.helix.manager.zk.ZkClient; import java.util.ArrayList; public class JobEngineStarter { - public static void main(String args[]) throws Exception { - - ZkClient zkClient = new ZkClient(ServerSettings.getZookeeperConnection(), ZkClient.DEFAULT_SESSION_TIMEOUT, - ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer()); - ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient); - - zkHelixAdmin.addCluster(ServerSettings.getSetting("helix.cluster.name"), true); + public static void main(String[] args) throws Exception { System.out.println("Starting Helix Controller ......."); - // Starting helix controller HelixController controller = new HelixController(); controller.startServer(); + Thread.sleep(5000); + System.out.println("Starting Helix Participant ......."); ArrayList<Class<? extends AbstractTask>> taskClasses = new ArrayList<>(); - for (String taskClassName : GlobalParticipant.TASK_CLASS_NAMES) { taskClasses.add(Class.forName(taskClassName).asSubclass(AbstractTask.class)); } - - System.out.println("Starting Helix Participant ......."); - - // Starting helix participant GlobalParticipant participant = new GlobalParticipant(taskClasses, null); participant.startServer(); + Thread.sleep(5000); System.out.println("Starting Pre Workflow Manager ......."); - PreWorkflowManager preWorkflowManager = new PreWorkflowManager(); preWorkflowManager.startServer(); + Thread.sleep(5000); System.out.println("Starting Post Workflow Manager ......."); - PostWorkflowManager postWorkflowManager = new PostWorkflowManager(); postWorkflowManager.startServer(); } diff --git a/pom.xml b/pom.xml index d49e1e0ad1..14770240e4 100644 --- a/pom.xml +++ b/pom.xml @@ -150,7 +150,7 @@ <angus-activation.version>2.0.2</angus-activation.version> <jaxb-api.version>2.4.0-b180830.0359</jaxb-api.version> <jmockit.version>1.44</jmockit.version> - <helix.version>1.4.0</helix.version> + <helix.version>1.4.3</helix.version> <keycloak.admin.client.version>24.0.4</keycloak.admin.client.version> <resteasy.version>6.2.12.Final</resteasy.version> <httpclient.version>4.5.14</httpclient.version>
