Updated startup scripts of resmgr and workflowmanager to use AvroRpc implementations.
Project: http://git-wip-us.apache.org/repos/asf/oodt/repo Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/ecfe0a78 Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/ecfe0a78 Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/ecfe0a78 Branch: refs/heads/development Commit: ecfe0a78849c928d904eeb129cfd7d24f9600afc Parents: 9feacbb Author: Imesha Sudasingha <imesha.sudasin...@gmail.com> Authored: Wed Apr 18 18:45:47 2018 +0530 Committer: Imesha Sudasingha <imesha.sudasin...@gmail.com> Committed: Wed Apr 18 18:45:47 2018 +0530 ---------------------------------------------------------------------- resource/src/main/bin/resmgr | 1 - .../resource/system/ResourceManagerMain.java | 6 +++- .../system/rpc/ResourceManagerFactory.java | 18 ++++++++++ resource/src/main/resources/resource.properties | 4 +++ .../workflow/system/WorkflowManagerStarter.java | 20 +++++++---- .../system/rpc/RpcCommunicationFactory.java | 38 +++++++++----------- workflow/src/main/resources/workflow.properties | 6 ++-- 7 files changed, 58 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oodt/blob/ecfe0a78/resource/src/main/bin/resmgr ---------------------------------------------------------------------- diff --git a/resource/src/main/bin/resmgr b/resource/src/main/bin/resmgr index f80cd90..1ca4978 100644 --- a/resource/src/main/bin/resmgr +++ b/resource/src/main/bin/resmgr @@ -51,7 +51,6 @@ case "$1" in $JAVA_HOME/bin/java -Djava.ext.dirs=${CAS_RESMGR_HOME}/lib \ -Djava.util.logging.config.file=${CAS_RESMGR_HOME}/etc/logging.properties \ -Dorg.apache.oodt.cas.resource.properties=${CAS_RESMGR_PROPS} \ - -Dresmgr.manager=org.apache.oodt.cas.resource.system.AvroRpcResourceManager \ org.apache.oodt.cas.resource.system.ResourceManagerMain --portNum $SERVER_PORT & echo $! >${RUN_HOME}/cas.resmgr.pid echo "OK" http://git-wip-us.apache.org/repos/asf/oodt/blob/ecfe0a78/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java index 162d64e..ad82d06 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java @@ -43,6 +43,7 @@ public class ResourceManagerMain { final ResourceManager manager = ResourceManagerFactory.getResourceManager(portNum); manager.startUp(); + logger.info("Resource manager started at port: {}", portNum); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { @@ -53,7 +54,10 @@ public class ResourceManagerMain { for (; ; ) { try { Thread.currentThread().join(); - } catch (InterruptedException ignore) { + } catch (InterruptedException e) { + logger.error("Main thread interrupted. Exiting: {}", e.getMessage()); + manager.shutdown(); + break; } } } http://git-wip-us.apache.org/repos/asf/oodt/blob/ecfe0a78/resource/src/main/java/org/apache/oodt/cas/resource/system/rpc/ResourceManagerFactory.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/rpc/ResourceManagerFactory.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/rpc/ResourceManagerFactory.java index 9287f6e..24f56b9 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/system/rpc/ResourceManagerFactory.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/rpc/ResourceManagerFactory.java @@ -22,6 +22,8 @@ import org.apache.oodt.cas.resource.system.ResourceManagerClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileInputStream; import java.lang.reflect.Constructor; import java.net.URL; @@ -29,7 +31,22 @@ public class ResourceManagerFactory { private static final Logger logger = LoggerFactory.getLogger(ResourceManagerFactory.class); + private static void loadProperties() { + // set up the configuration, if there is any + if (System.getProperty("org.apache.oodt.cas.resource.properties") != null) { + String configFile = System.getProperty("org.apache.oodt.cas.resource.properties"); + + logger.info("Loading File Manager Configuration Properties from: [{}]", configFile); + try { + System.getProperties().load(new FileInputStream(new File(configFile))); + } catch (Exception e) { + logger.error("Error loading configuration properties from: [{}]", configFile); + } + } + } + public static ResourceManager getResourceManager(int port) throws Exception { + loadProperties(); String resourceManagerClass = System.getProperty("resmgr.manager", "org.apache.oodt.cas.resource.system.AvroRpcResourceManager"); @@ -48,6 +65,7 @@ public class ResourceManagerFactory { } public static ResourceManagerClient getResourceManagerClient(URL url) throws Exception { + loadProperties(); String resMgrClientClass = System.getProperty("resmgr.manager.client", "org.apache.oodt.cas.resource.system.AvroRpcResourceManagerClient"); http://git-wip-us.apache.org/repos/asf/oodt/blob/ecfe0a78/resource/src/main/resources/resource.properties ---------------------------------------------------------------------- diff --git a/resource/src/main/resources/resource.properties b/resource/src/main/resources/resource.properties index a467949..bf6a420 100644 --- a/resource/src/main/resources/resource.properties +++ b/resource/src/main/resources/resource.properties @@ -16,6 +16,10 @@ # # Properties required to configure the Resource Manager +# Client and server classes to be used as resource managers +resmgr.manager=org.apache.oodt.cas.resource.system.AvroRpcResourceManager +resmgr.manager.client=org.apache.oodt.cas.resource.system.AvroRpcResourceManagerClient + # resource spark master resource.runner.spark.host = mesos://<ip>:5050 http://git-wip-us.apache.org/repos/asf/oodt/blob/ecfe0a78/workflow/src/main/java/org/apache/oodt/cas/workflow/system/WorkflowManagerStarter.java ---------------------------------------------------------------------- diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/WorkflowManagerStarter.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/WorkflowManagerStarter.java index 04d5831..775574d 100644 --- a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/WorkflowManagerStarter.java +++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/WorkflowManagerStarter.java @@ -58,14 +58,20 @@ public class WorkflowManagerStarter { } loadProperties(); - WorkflowManager manager = RpcCommunicationFactory.createServer(portNum); + final WorkflowManager manager = RpcCommunicationFactory.createServer(portNum); - for (;;) - try { - Thread.currentThread().join(); - } catch (InterruptedException ignore) { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + manager.shutdown(); } - } - + }); + try { + Thread.currentThread().join(); + } catch (InterruptedException e) { + LOG.log(Level.SEVERE, String.format("Interrupted while executing: %s", e.getMessage())); + manager.shutdown(); + } + } } http://git-wip-us.apache.org/repos/asf/oodt/blob/ecfe0a78/workflow/src/main/java/org/apache/oodt/cas/workflow/system/rpc/RpcCommunicationFactory.java ---------------------------------------------------------------------- diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/rpc/RpcCommunicationFactory.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/rpc/RpcCommunicationFactory.java index 2a8e471..f0b89ac 100644 --- a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/rpc/RpcCommunicationFactory.java +++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/rpc/RpcCommunicationFactory.java @@ -19,6 +19,8 @@ package org.apache.oodt.cas.workflow.system.rpc; import org.apache.oodt.cas.workflow.system.WorkflowManager; import org.apache.oodt.cas.workflow.system.WorkflowManagerClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; @@ -27,13 +29,15 @@ import java.util.Properties; /** * @author radu - * + * <p> * Create instaces server/client for {@link WorkflowManager} and {@link WorkflowManagerClient} */ public class RpcCommunicationFactory { - private static String getRpcServerClassName(){ + private static final Logger logger = LoggerFactory.getLogger(RpcCommunicationFactory.class); + + private static String getRpcServerClassName() { InputStream prpFileStream = RpcCommunicationFactory.class.getResourceAsStream("/workflow.properties"); Properties properties = new Properties(); try { @@ -45,49 +49,39 @@ public class RpcCommunicationFactory { "org.apache.oodt.cas.workflow.system.rpc.XmlRpcWorkflowManagerFactory"); } - private static String getRpcClientClassName(){ + private static String getRpcClientClassName() { InputStream prpFileStream = RpcCommunicationFactory.class.getResourceAsStream("/workflow.properties"); Properties properties = new Properties(); try { properties.load(prpFileStream); } catch (IOException e) { - e.printStackTrace(); + logger.error("Unable to load properties", e); } return properties.getProperty("workflow.client.factory", "org.apache.oodt.cas.workflow.system.rpc.XmlRpcWorkflowManagerFactory"); } - public static WorkflowManager createServer(int port){ + public static WorkflowManager createServer(int port) { try { WorkflowManagerFactory workflowManagerFactory = (WorkflowManagerFactory) Class.forName(getRpcServerClassName()).newInstance(); workflowManagerFactory.setPort(port); return workflowManagerFactory.createServer(); - } catch (InstantiationException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); + } catch (Exception e) { + logger.error("Error creating server", e); + throw new IllegalStateException("Unable to create server", e); } - return null; } - public static WorkflowManagerClient createClient(URL url){ + public static WorkflowManagerClient createClient(URL url) { try { WorkflowManagerFactory workflowManagerFactory = (WorkflowManagerFactory) Class.forName(getRpcClientClassName()).newInstance(); workflowManagerFactory.setUrl(url); return workflowManagerFactory.createClient(); - } catch (InstantiationException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); + } catch (Exception e) { + logger.error("Unable to create client", e); + throw new IllegalStateException("Unable to create client", e); } - return null; } - - } http://git-wip-us.apache.org/repos/asf/oodt/blob/ecfe0a78/workflow/src/main/resources/workflow.properties ---------------------------------------------------------------------- diff --git a/workflow/src/main/resources/workflow.properties b/workflow/src/main/resources/workflow.properties index 912a186..328d991 100644 --- a/workflow/src/main/resources/workflow.properties +++ b/workflow/src/main/resources/workflow.properties @@ -16,10 +16,8 @@ # Properties required to configure the Workflow Manager #Rpc workflow communication class -workflow.server.factory = org.apache.oodt.cas.workflow.system.rpc.XmlRpcWorkflowManagerFactory -workflow.client.factory = org.apache.oodt.cas.workflow.system.rpc.XmlRpcWorkflowManagerFactory -#workflow.server.factory = org.apache.oodt.cas.workflow.system.AvroRpcWorkflowManagerFactory -#workflow.client.factory = org.apache.oodt.cas.workflow.system.AvroRpcWorkflowManagerFactory +workflow.server.factory = org.apache.oodt.cas.workflow.system.rpc.AvroRpcWorkflowManagerFactory +workflow.client.factory = org.apache.oodt.cas.workflow.system.rpc.AvroRpcWorkflowManagerFactory # workflow repository factory workflow.repo.factory = org.apache.oodt.cas.workflow.repository.XMLWorkflowRepositoryFactory