This is an automated email from the ASF dual-hosted git repository.

yasith pushed a commit to branch cybershuttle-dev
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 713918e38997b85b8badde6dfa2fb3f3654351a2
Author: yasithdev <[email protected]>
AuthorDate: Fri Mar 21 23:27:03 2025 +0000

    fix issues with helix participant and job engine starter
---
 .../helix/impl/controller/HelixController.java     | 60 ++++++--------
 .../helix/impl/participant/GlobalParticipant.java  |  5 +-
 .../helix/core/participant/HelixParticipant.java   | 93 ++++++++++++----------
 .../airavata/ide/integration/JobEngineStarter.java | 26 ++----
 pom.xml                                            |  2 +-
 5 files changed, 85 insertions(+), 101 deletions(-)

diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java
index ee4656ea1c..be3c11fbe9 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java
@@ -23,15 +23,12 @@ import 
org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.IdealState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 /**
  * TODO: Class level comments please
@@ -43,13 +40,13 @@ public class HelixController implements Runnable {
 
     private final static Logger logger = 
LoggerFactory.getLogger(HelixController.class);
 
-    private String clusterName;
-    private String controllerName;
-    private String zkAddress;
+    private final String clusterName;
+    private final String controllerName;
+    private final String zkAddress;
     private org.apache.helix.HelixManager zkHelixManager;
 
-    private CountDownLatch startLatch = new CountDownLatch(1);
-    private CountDownLatch stopLatch = new CountDownLatch(1);
+    private final CountDownLatch startLatch = new CountDownLatch(1);
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
 
     @SuppressWarnings("WeakerAccess")
     public HelixController() throws ApplicationSettingsException {
@@ -60,50 +57,43 @@ public class HelixController implements Runnable {
 
     public void run() {
         try {
-            ZkClient zkClient = new 
ZkClient(ServerSettings.getZookeeperConnection(), 
ZkClient.DEFAULT_SESSION_TIMEOUT,
-                    ZkClient.DEFAULT_CONNECTION_TIMEOUT, new 
ZNRecordSerializer());
-            ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient);
-
-            // Creates the zk cluster if not available
-            if (! zkHelixAdmin.getClusters().contains(clusterName)) {
-                zkHelixAdmin.addCluster(clusterName, true);
-            }
-
+            logger.info("Zookeeper connection string: {}", zkAddress);
+            logger.info("Helix cluster: {}", clusterName);
+            logger.info("Helix controller: {}", controllerName);
+            ZKHelixAdmin zkHelixAdmin = new 
ZKHelixAdmin.Builder().setZkAddress(this.zkAddress).build();
+            logger.info("[zkAdmin] started");
+            logger.info("[zkAdmin] Cluster: {} adding if not available...", 
clusterName);
+            zkHelixAdmin.addCluster(this.clusterName, true);
+            logger.info("[zkAdmin] Cluster: {} now available!", clusterName);
             zkHelixAdmin.close();
-            zkClient.close();
+            logger.info("[zkAdmin] closed");
 
-            logger.info("Connection to helix cluster : " + clusterName + " 
with name : " + controllerName);
-            logger.info("Zookeeper connection string " + zkAddress);
-
-            zkHelixManager = 
HelixControllerMain.startHelixController(zkAddress, clusterName,
-                    controllerName, HelixControllerMain.STANDALONE);
+            logger.info("helix controller - starting...");
+            zkHelixManager = 
HelixControllerMain.startHelixController(zkAddress, clusterName, 
controllerName, HelixControllerMain.STANDALONE);
+            logger.info("Helix controller - started");
             startLatch.countDown();
             stopLatch.await();
         } catch (Exception ex) {
-            logger.error("Error in run() for Controller: " + controllerName + 
", reason: " + ex, ex);
+            logger.error("Error in run() for Controller: {}, reason: {}", 
controllerName, ex, ex);
         } finally {
             disconnect();
         }
     }
 
+    @SuppressWarnings("RedundantThrows")
     public void startServer() throws Exception {
 
-        //WorkflowCleanupAgent cleanupAgent = new WorkflowCleanupAgent();
-        //cleanupAgent.init();
-        //ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
-        //executor.scheduleWithFixedDelay(cleanupAgent, 10, 120, 
TimeUnit.SECONDS);
-
         new Thread(this).start();
         try {
             startLatch.await();
-            logger.info("Controller: " + controllerName + ", has connected to 
cluster: " + clusterName);
+            logger.info("Controller: {}, has connected to cluster: {}", 
controllerName, clusterName);
 
             Runtime.getRuntime().addShutdownHook(
                     new Thread(this::disconnect)
             );
 
         } catch (InterruptedException ex) {
-            logger.error("Controller: " + controllerName + ", is interrupted! 
reason: " + ex, ex);
+            logger.error("Controller: {}, is interrupted! reason: {}", 
controllerName, ex, ex);
         }
     }
 
@@ -114,12 +104,12 @@ public class HelixController implements Runnable {
 
     private void disconnect() {
         if (zkHelixManager != null) {
-            logger.info("Controller: " + controllerName + ", has disconnected 
from cluster: " + clusterName);
+            logger.info("Controller: {}, has disconnected from cluster: {}", 
controllerName, clusterName);
             zkHelixManager.disconnect();
         }
     }
 
-    public static void main(String args[]) {
+    public static void main(String[] args) {
         try {
 
             logger.info("Starting helix controller");
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index fb439f45e7..54a657315b 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -62,18 +62,19 @@ public class GlobalParticipant extends 
HelixParticipant<AbstractTask> {
         t.start();
     }
 
+    @SuppressWarnings("unused")
     public void stopServer() {
 
     }
 
-    public static void main(String args[]) {
+    public static void main(String[] args) {
         logger.info("Starting global participant");
 
         try {
             ArrayList<Class<? extends AbstractTask>> taskClasses = new 
ArrayList<>();
 
             for (String taskClassName : TASK_CLASS_NAMES) {
-                logger.debug("Adding task class: " + taskClassName + " to the 
global participant");
+                logger.debug("Adding task class: {} to the global 
participant", taskClassName);
                 
taskClasses.add(Class.forName(taskClassName).asSubclass(AbstractTask.class));
             }
 
diff --git 
a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
 
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
index 378e678020..8bf040f513 100644
--- 
a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
+++ 
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
@@ -23,22 +23,22 @@ import 
org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.helix.core.support.TaskHelperImpl;
 import org.apache.airavata.helix.core.AbstractTask;
-import org.apache.airavata.helix.core.util.PropertyResolver;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
 import org.apache.helix.InstanceType;
 import org.apache.helix.examples.OnlineOfflineStateModelFactory;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.*;
 
 /**
@@ -54,15 +54,16 @@ public class HelixParticipant<T extends AbstractTask> 
implements Runnable {
     private int shutdownGracePeriod = 30000;
     private int shutdownGraceRetries = 2;
 
-    private String zkAddress;
-    private String clusterName;
-    private String participantName;
+    private final String zkAddress;
+    private final String clusterName;
+    private final String participantName;
+    private final String taskTypeName;
+
     private ZKHelixManager zkHelixManager;
 
-    private String taskTypeName;
 
-    private List<Class<? extends T>> taskClasses;
-    private final List<String> runningTasks = Collections.synchronizedList(new 
ArrayList<String>());
+    private final List<Class<? extends T>> taskClasses;
+    private final List<String> runningTasks = Collections.synchronizedList(new 
ArrayList<>());
 
     public HelixParticipant(List<Class<? extends T>> taskClasses, String 
taskTypeName) throws ApplicationSettingsException {
 
@@ -75,38 +76,41 @@ public class HelixParticipant<T extends AbstractTask> 
implements Runnable {
         this.taskTypeName = taskTypeName;
         this.taskClasses = taskClasses;
 
-        logger.info("Zookeeper connection URL " + zkAddress);
-        logger.info("Cluster name " + clusterName);
-        logger.info("Participant name " + participantName);
-        logger.info("Task type " + taskTypeName);
+        logger.info("Zookeeper connection URL {}", zkAddress);
+        logger.info("Cluster name {}", clusterName);
+        logger.info("Participant name {}", participantName);
+        logger.info("Task type {}", taskTypeName);
 
         if (taskClasses != null) {
             for (Class<? extends T> taskClass : taskClasses) {
-                logger.info("Task classes include: " + 
taskClass.getCanonicalName());
+                logger.info("Task classes include: {}", 
taskClass.getCanonicalName());
             }
         }
     }
 
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public HelixParticipant(Class<T> taskClass, String taskTypeName) throws 
ApplicationSettingsException {
         this(taskClass != null ? Collections.singletonList(taskClass) : null, 
taskTypeName);
     }
 
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void setShutdownGracePeriod(int shutdownGracePeriod) {
         this.shutdownGracePeriod = shutdownGracePeriod;
     }
 
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void setShutdownGraceRetries(int shutdownGraceRetries) {
         this.shutdownGraceRetries = shutdownGraceRetries;
     }
 
     public void registerRunningTask(AbstractTask task) {
         runningTasks.add(task.getTaskId());
-        logger.info("Registered Task " + task.getTaskId() + ". Currently 
available " + runningTasks.size());
+        logger.info("Registered Task {}. Currently available {}", 
task.getTaskId(), runningTasks.size());
     }
 
     public void unregisterRunningTask(AbstractTask task) {
         runningTasks.remove(task.getTaskId());
-        logger.info("Un registered Task " + task.getTaskId() + ". Currently 
available " + runningTasks.size());
+        logger.info("Un registered Task {}. Currently available {}", 
task.getTaskId(), runningTasks.size());
     }
 
     @SuppressWarnings("WeakerAccess")
@@ -116,54 +120,58 @@ public class HelixParticipant<T extends AbstractTask> 
implements Runnable {
         for (Class<? extends T> taskClass : taskClasses) {
             TaskFactory taskFac = context -> {
                 try {
-                    return AbstractTask.class.cast(taskClass.newInstance())
+                    return taskClass.getDeclaredConstructor().newInstance()
                             .setParticipant(HelixParticipant.this)
                             .setCallbackContext(context)
                             .setTaskHelper(new TaskHelperImpl());
-                } catch (InstantiationException | IllegalAccessException e) {
-                    logger.error("Failed to initialize the task: " + 
context.getTaskConfig().getId(), e);
+                } catch (InstantiationException | IllegalAccessException | 
NoSuchMethodException |
+                         InvocationTargetException e) {
+                    logger.error("Failed to initialize the task: {}", 
context.getTaskConfig().getId(), e);
                     return null;
                 }
             };
             TaskDef taskDef = taskClass.getAnnotation(TaskDef.class);
-            taskRegistry.put(taskDef.name(), taskFac);
+            if (taskDef != null) {
+                taskRegistry.put(taskDef.name(), taskFac);
+            } else {
+                logger.warn("Task class {} is not annotated with @TaskDef", 
taskClass.getCanonicalName());
+            }
         }
         return taskRegistry;
     }
 
     public void run() {
-        ZkClient zkClient = null;
+       ZkClient zkClient = null;
         try {
             zkClient = new ZkClient(zkAddress, 
ZkClient.DEFAULT_SESSION_TIMEOUT,
                     ZkClient.DEFAULT_CONNECTION_TIMEOUT, new 
ZNRecordSerializer());
-            ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient);
+            ZKHelixAdmin zkHelixAdmin = new 
ZKHelixAdmin.Builder().setZkAddress(zkAddress).build();
 
             List<String> nodesInCluster = 
zkHelixAdmin.getInstancesInCluster(clusterName);
 
             if (!nodesInCluster.contains(participantName)) {
                 InstanceConfig instanceConfig = new 
InstanceConfig(participantName);
                 instanceConfig.setHostName("localhost");
-                instanceConfig.setInstanceEnabled(true);
                 if (taskTypeName != null) {
                     instanceConfig.addTag(taskTypeName);
                 }
                 zkHelixAdmin.addInstance(clusterName, instanceConfig);
-                logger.info("Participant: " + participantName + " has been 
added to cluster: " + clusterName);
+                logger.info("Participant: {} has been added to cluster: {}", 
participantName, clusterName);
             } else {
                 if (taskTypeName != null) {
                     zkHelixAdmin.addInstanceTag(clusterName, participantName, 
taskTypeName);
                 }
-                zkHelixAdmin.enableInstance(clusterName, participantName, 
true);
-                logger.debug("Participant: " + participantName + " has been 
re-enabled at the cluster: " + clusterName);
+                zkHelixAdmin.enableResource(clusterName, participantName, 
true);
+                logger.debug("Participant: {} has been re-enabled at the 
cluster: {}", participantName, clusterName);
             }
 
             Runtime.getRuntime().addShutdownHook(
                     new Thread(() -> {
-                        logger.debug("Participant: " + participantName + " 
shutdown hook called");
+                        logger.debug("Participant: {} shutdown hook called", 
participantName);
                         try {
-                            zkHelixAdmin.enableInstance(clusterName, 
participantName, false);
+                            zkHelixAdmin.enableResource(clusterName, 
participantName, false);
                         } catch (Exception e) {
-                            logger.warn("Participant: " + participantName + " 
was not disabled normally", e);
+                            logger.warn("Participant: {} was not disabled 
normally", participantName, e);
                         }
                         disconnect();
                     })
@@ -172,7 +180,7 @@ public class HelixParticipant<T extends AbstractTask> 
implements Runnable {
             // connect the participant manager
             connect();
         } catch (Exception ex) {
-            logger.error("Error in run() for Participant: " + participantName 
+ ", reason: " + ex, ex);
+            logger.error("Error in run() for Participant: {}, reason: {}", 
participantName, ex, ex);
         } finally {
             if (zkClient != null) {
                 zkClient.close();
@@ -191,38 +199,39 @@ public class HelixParticipant<T extends AbstractTask> 
implements Runnable {
             // register task model
             machineEngine.registerStateModelFactory("Task", new 
TaskStateModelFactory(zkHelixManager, getTaskFactory()));
 
-            logger.debug("Participant: " + participantName + ", registered 
state model factories.");
+            logger.debug("Participant: {}, registered state model factories.", 
participantName);
 
             zkHelixManager.connect();
-            logger.info("Participant: " + participantName + ", has connected 
to cluster: " + clusterName);
+            logger.info("Participant: {}, has connected to cluster: {}", 
participantName, clusterName);
 
             Thread.currentThread().join();
         } catch (InterruptedException ex) {
-            logger.error("Participant: " + participantName + ", is 
interrupted! reason: " + ex, ex);
+            logger.error("Participant: {}, is interrupted! reason: {}", 
participantName, ex, ex);
         } catch (Exception ex) {
-            logger.error("Error in connect() for Participant: " + 
participantName + ", reason: " + ex, ex);
+            logger.error("Error in connect() for Participant: {}, reason: {}", 
participantName, ex, ex);
         } finally {
             disconnect();
         }
     }
 
     private void disconnect() {
-        logger.info("Shutting down participant. Currently available tasks " + 
runningTasks.size());
+        logger.info("Shutting down participant. Currently available tasks {}", 
runningTasks.size());
         if (zkHelixManager != null) {
-            if (runningTasks.size() > 0) {
+            if (!runningTasks.isEmpty()) {
                 for (int i = 0; i <= shutdownGraceRetries; i++) {
-                    logger.info("Shutting down gracefully [RETRY " + i + "]");
+                    logger.info("Shutting down gracefully [RETRY {}]", i);
                     try {
+                        //noinspection BusyWait
                         Thread.sleep(shutdownGracePeriod);
                     } catch (InterruptedException e) {
-                        logger.warn("Waiting for running tasks failed [RETRY " 
+ i + "]", e);
+                        logger.warn("Waiting for running tasks failed [RETRY 
{}]", i, e);
                     }
-                    if (runningTasks.size() == 0) {
+                    if (runningTasks.isEmpty()) {
                         break;
                     }
                 }
             }
-            logger.info("Participant: " + participantName + ", has 
disconnected from cluster: " + clusterName);
+            logger.info("Participant: {}, has disconnected from cluster: {}", 
participantName, clusterName);
             zkHelixManager.disconnect();
         }
     }
diff --git 
a/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/JobEngineStarter.java
 
b/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/JobEngineStarter.java
index cc15a118af..f9f4b81ab0 100644
--- 
a/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/JobEngineStarter.java
+++ 
b/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/JobEngineStarter.java
@@ -1,53 +1,37 @@
 package org.apache.airavata.ide.integration;
 
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.helix.core.AbstractTask;
 import org.apache.airavata.helix.impl.controller.HelixController;
 import org.apache.airavata.helix.impl.participant.GlobalParticipant;
 import org.apache.airavata.helix.impl.workflow.PostWorkflowManager;
 import org.apache.airavata.helix.impl.workflow.PreWorkflowManager;
-import org.apache.airavata.monitor.email.EmailBasedMonitor;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
 
 import java.util.ArrayList;
 
 public class JobEngineStarter {
 
-    public static void main(String args[]) throws Exception {
-
-        ZkClient zkClient = new 
ZkClient(ServerSettings.getZookeeperConnection(), 
ZkClient.DEFAULT_SESSION_TIMEOUT,
-                    ZkClient.DEFAULT_CONNECTION_TIMEOUT, new 
ZNRecordSerializer());
-        ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient);
-
-        
zkHelixAdmin.addCluster(ServerSettings.getSetting("helix.cluster.name"), true);
+    public static void main(String[] args) throws Exception {
 
         System.out.println("Starting Helix Controller .......");
-        // Starting helix controller
         HelixController controller = new HelixController();
         controller.startServer();
+        Thread.sleep(5000);
 
+        System.out.println("Starting Helix Participant .......");
         ArrayList<Class<? extends AbstractTask>> taskClasses = new 
ArrayList<>();
-
         for (String taskClassName : GlobalParticipant.TASK_CLASS_NAMES) {
             
taskClasses.add(Class.forName(taskClassName).asSubclass(AbstractTask.class));
         }
-
-        System.out.println("Starting Helix Participant .......");
-
-        // Starting helix participant
         GlobalParticipant participant = new GlobalParticipant(taskClasses, 
null);
         participant.startServer();
+        Thread.sleep(5000);
 
         System.out.println("Starting Pre Workflow Manager .......");
-
         PreWorkflowManager preWorkflowManager = new PreWorkflowManager();
         preWorkflowManager.startServer();
+        Thread.sleep(5000);
 
         System.out.println("Starting Post Workflow Manager .......");
-
         PostWorkflowManager postWorkflowManager = new PostWorkflowManager();
         postWorkflowManager.startServer();
     }
diff --git a/pom.xml b/pom.xml
index 2254cc78b1..7506374192 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,7 +149,7 @@
         <angus-activation.version>2.0.2</angus-activation.version>
         <jaxb-api.version>2.4.0-b180830.0359</jaxb-api.version>
         <jmockit.version>1.44</jmockit.version>
-        <helix.version>1.4.0</helix.version>
+        <helix.version>1.4.3</helix.version>
         <keycloak.admin.client.version>24.0.4</keycloak.admin.client.version>
         <resteasy.version>6.2.12.Final</resteasy.version>
         <httpclient.version>4.5.14</httpclient.version>

Reply via email to