Repository: airavata Updated Branches: refs/heads/master 67195e535 -> e5901e8e8
support forced shutdown Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2a813b80 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2a813b80 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2a813b80 Branch: refs/heads/master Commit: 2a813b80626525b7136c9ffd13c16ae04aeba748 Parents: 1263b2b Author: Saminda Wijeratne <[email protected]> Authored: Mon Mar 10 22:22:16 2014 -0400 Committer: Saminda Wijeratne <[email protected]> Committed: Mon Mar 10 22:22:16 2014 -0400 ---------------------------------------------------------------------- .../airavata/api/server/AiravataAPIServer.java | 42 +++--- .../common/utils/ApplicationSettings.java | 4 + .../apache/airavata/common/utils/IServer.java | 6 +- .../airavata/common/utils/StringUtil.java | 19 ++- .../orchestrator/server/OrchestratorServer.java | 42 +++--- .../org/apache/airavata/server/ServerMain.java | 138 +++++++++++++------ .../main/resources/airavata-server.properties | 7 +- 7 files changed, 172 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/2a813b80/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java index 53f4696..1531086 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java @@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory; public class AiravataAPIServer implements IServer{ private final static Logger logger = LoggerFactory.getLogger(AiravataAPIServer.class); + private static final String SERVER_NAME = "Airavata API Server"; + private static final String SERVER_VERSION = "1.0"; //FIXME: Read the port from airavata-server.config file private ServerStatus status; @@ -55,12 +57,10 @@ public class AiravataAPIServer implements IServer{ try { AiravataUtils.setExecutionAsServer(); RegistryInitUtil.initializeDB(); - int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.THRIFT_SERVER_PORT,"8930")); + final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.THRIFT_SERVER_PORT,"8930")); TServerTransport serverTransport = new TServerSocket(serverPort); server = new TSimpleServer( new TServer.Args(serverTransport).processor(mockAiravataServer)); - logger.info("Starting Airavata API Server on Port " + serverPort); - logger.info("Listening to Airavata Clients ...."); new Thread() { public void run() { server.serve(); @@ -69,7 +69,22 @@ public class AiravataAPIServer implements IServer{ logger.info("Airavata API Server Stopped."); } }.start(); - setStatus(ServerStatus.STARTED); + new Thread() { + public void run() { + while(!server.isServing()){ + try { + Thread.sleep(500); + } catch (InterruptedException e) { + break; + } + } + if (server.isServing()){ + setStatus(ServerStatus.STARTED); + logger.info("Starting Airavata API Server on Port " + serverPort); + logger.info("Listening to Airavata Clients ...."); + } + } + }.start(); } catch (TTransportException e) { logger.error(e.getMessage()); setStatus(ServerStatus.FAILED); @@ -127,22 +142,13 @@ public class AiravataAPIServer implements IServer{ } @Override - public void waitForServerToStart() throws Exception { - while((getStatus()==ServerStatus.STARTING || getStatus()==ServerStatus.STARTED) && !server.isServing()){ - Thread.sleep(100); - } - if (!(getStatus()==ServerStatus.STARTING || getStatus()==ServerStatus.STARTED)){ - throw new Exception("The Airavata API server did not start!!!"); - } + public String getName() { + return SERVER_NAME; } @Override - public void waitForServerToStop() throws Exception { - while((getStatus()==ServerStatus.STOPING || getStatus()==ServerStatus.STOPPED) && server.isServing()){ - Thread.sleep(100); - } - if (!(getStatus()==ServerStatus.STOPING || getStatus()==ServerStatus.STOPPED)){ - throw new Exception("Error stopping the Airavata API server!!!"); - } + public String getVersion() { + return SERVER_VERSION; } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/2a813b80/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java index f6d0cd8..82bf4dc 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java @@ -244,6 +244,10 @@ public abstract class ApplicationSettings { return getSetting("email.from"); } + /** + * @deprecated use {{@link #getSetting(String)}} + * @return + */ public static Properties getProperties() { return properties; } http://git-wip-us.apache.org/repos/asf/airavata/blob/2a813b80/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/IServer.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/IServer.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/IServer.java index 361865f..867eb45 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/IServer.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/IServer.java @@ -39,11 +39,13 @@ public interface IServer { return now; } } + public String getName(); + public String getVersion(); public void start() throws Exception; public void stop() throws Exception; public void restart() throws Exception; public void configure() throws Exception; public ServerStatus getStatus() throws Exception; - public void waitForServerToStart() throws Exception; - public void waitForServerToStop() throws Exception; +// public void waitForServerToStart() throws Exception; +// public void waitForServerToStop() throws Exception; } http://git-wip-us.apache.org/repos/asf/airavata/blob/2a813b80/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/StringUtil.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/StringUtil.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/StringUtil.java index f319218..6356326 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/StringUtil.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/StringUtil.java @@ -378,13 +378,24 @@ public class StringUtil { private static Options deriveCommandLineOptions(String[] args){ Options options = new Options(); String[] argCopy = getChangedList(args); + int i=0; for (String arg : argCopy) { if (arg.startsWith("--")){ arg=arg.substring(2); int pos = arg.indexOf('='); - String opt = pos == -1 ? arg : arg.substring(0, pos); - options.addOption(opt, true, ""); + String opt; + boolean hasArgs=true; + if (pos==-1){ //if not of the form --arg=value + if (i==argCopy.length-1 || argCopy[i+1].startsWith("-")){ // no value specified + hasArgs=false; + } + opt=arg; + }else{ + opt=arg.substring(0, pos); + } + options.addOption(opt, hasArgs, ""); } + i++; } return options; } @@ -423,11 +434,11 @@ public class StringUtil { } private static String revertOption(String option){ - return option.replaceAll(Pattern.quote(ARG_DOT_REPLACE), "."); + return option==null? option : option.replaceAll(Pattern.quote(ARG_DOT_REPLACE), "."); } private static String changeOption(String option){ - return option.replaceAll(Pattern.quote("."), ARG_DOT_REPLACE); + return option==null? option : option.replaceAll(Pattern.quote("."), ARG_DOT_REPLACE); } private static class DynamicOptionPosixParser extends PosixParser{ http://git-wip-us.apache.org/repos/asf/airavata/blob/2a813b80/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java index dfff8a5..5df526b 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java @@ -23,6 +23,7 @@ package org.apache.airavata.orchestrator.server; import org.apache.airavata.common.utils.IServer; import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.IServer.ServerStatus; import org.apache.airavata.orchestrator.cpi.OrchestratorService; import org.apache.airavata.orchestrator.util.Constants; import org.apache.thrift.server.TServer; @@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory; public class OrchestratorServer implements IServer{ private final static Logger logger = LoggerFactory.getLogger(OrchestratorServer.class); + private static final String SERVER_NAME = "Orchestrator Server"; + private static final String SERVER_VERSION = "1.0"; private ServerStatus status; @@ -49,12 +52,10 @@ public class OrchestratorServer implements IServer{ public void StartOrchestratorServer(OrchestratorService.Processor<OrchestratorServerHandler> orchestratorServerHandlerProcessor) throws Exception { try { - int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.ORCHESTRATOT_SERVER_PORT,"8940")); + final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.ORCHESTRATOT_SERVER_PORT,"8940")); TServerTransport serverTransport = new TServerSocket(serverPort); server = new TSimpleServer( new TServer.Args(serverTransport).processor(orchestratorServerHandlerProcessor)); - logger.info("Starting Orchestrator Server on Port " + serverPort); - logger.info("Listening to Orchestrator Clients ...."); new Thread() { public void run() { server.serve(); @@ -62,7 +63,22 @@ public class OrchestratorServer implements IServer{ logger.info("Orchestrator Server Stopped."); } }.start(); - setStatus(ServerStatus.STARTED); + new Thread() { + public void run() { + while(!server.isServing()){ + try { + Thread.sleep(500); + } catch (InterruptedException e) { + break; + } + } + if (server.isServing()){ + setStatus(ServerStatus.STARTED); + logger.info("Starting Orchestrator Server on Port " + serverPort); + logger.info("Listening to Orchestrator Clients ...."); + } + } + }.start(); } catch (TTransportException e) { logger.error(e.getMessage()); setStatus(ServerStatus.FAILED); @@ -117,23 +133,13 @@ public class OrchestratorServer implements IServer{ } @Override - public void waitForServerToStart() throws Exception { - while((getStatus()==ServerStatus.STARTING || getStatus()==ServerStatus.STARTED) && !server.isServing()){ - Thread.sleep(100); - } - if (!(getStatus()==ServerStatus.STARTING || getStatus()==ServerStatus.STARTED)){ - throw new Exception("The Orchestrator server did not start!!!"); - } + public String getName() { + return SERVER_NAME; } @Override - public void waitForServerToStop() throws Exception { - while((getStatus()==ServerStatus.STOPING || getStatus()==ServerStatus.STOPPED) && server.isServing()){ - Thread.sleep(100); - } - if (!(getStatus()==ServerStatus.STOPING || getStatus()==ServerStatus.STOPPED)){ - throw new Exception("Error stopping the Orchestrator server!!!"); - } + public String getVersion() { + return SERVER_VERSION; } } http://git-wip-us.apache.org/repos/asf/airavata/blob/2a813b80/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java ---------------------------------------------------------------------- diff --git a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java index 1123e54..9e1b1f8 100644 --- a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java +++ b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java @@ -29,6 +29,7 @@ import java.util.List; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.IServer; +import org.apache.airavata.common.utils.IServer.ServerStatus; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.StringUtil; import org.apache.airavata.common.utils.StringUtil.CommandLineParameters; @@ -42,8 +43,10 @@ public class ServerMain { private final static Logger logger = LoggerFactory.getLogger(ServerMain.class); private static boolean serversLoaded=false; private static final String stopFileNamePrefix = "airavata-server-stop"; + private static final String stopFileNamePrefixForced = "airavata-server-stop-forced"; private static int serverIndex=-1; private static final String serverStartedFileNamePrefix = "airavata-server-start"; + private static boolean forcedStop=false; static{ servers = new ArrayList<IServer>(); } @@ -85,60 +88,84 @@ public class ServerMain { public static void main(String args[]) throws ParseException, IOException { CommandLineParameters commandLineParameters = StringUtil.getCommandLineParser(args); - if (commandLineParameters.getArguments().contains("stop")){ - String serverIndexOption = "serverIndex"; - if (commandLineParameters.getParameters().containsKey(serverIndexOption)){ - serverIndex=Integer.parseInt(commandLineParameters.getParameters().get(serverIndexOption)); - } - if (isServerRunning()) { - logger.info("Requesting airavata server"+(serverIndex==-1? "(s)":" instance "+serverIndex)+" to stop..."); - requestStop(); - while(isServerRunning()){ - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - logger.info("Server"+(serverIndex==-1? "(s)":" instance "+serverIndex)+" stopped!!!"); - }else{ - logger.error("Server"+(serverIndex==-1? "":" instance "+serverIndex)+" is not running!!!"); - } + performServerStopRequest(commandLineParameters); }else{ - setServerStarted(); - logger.info("Airavata server instance "+serverIndex+" starting..."); - ServerSettings.mergeSettingsCommandLineArgs(args); - startAllServers(); - while(!hasStopRequested()){ + performServerStart(args); + } + } + + private static void performServerStart(String[] args) { + setServerStarted(); + logger.info("Airavata server instance "+serverIndex+" starting..."); + ServerSettings.mergeSettingsCommandLineArgs(args); + startAllServers(); + while(!hasStopRequested()){ + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + stopAllServers(); + } + } + if (hasStopRequested()){ + stopAllServers(); + System.exit(0); + } + } + + private static void performServerStopRequest( + CommandLineParameters commandLineParameters) throws IOException { + String serverIndexOption = "serverIndex"; + if (commandLineParameters.getParameters().containsKey(serverIndexOption)){ + serverIndex=Integer.parseInt(commandLineParameters.getParameters().get(serverIndexOption)); + } + String serverForcedStop = "force"; + if (commandLineParameters.getParameters().containsKey(serverForcedStop)){ + forcedStop=true; + } + if (isServerRunning()) { + logger.info("Requesting airavata server"+(serverIndex==-1? "(s)":" instance "+serverIndex)+" to stop..."); + requestStop(); + while(isServerRunning()){ try { - Thread.sleep(2000); + Thread.sleep(5000); } catch (InterruptedException e) { - stopAllServers(); + e.printStackTrace(); } } - if (hasStopRequested()){ - stopAllServers(); - System.exit(0); - } + logger.info("Server"+(serverIndex==-1? "(s)":" instance "+serverIndex)+" stopped!!!"); + }else{ + logger.error("Server"+(serverIndex==-1? "":" instance "+serverIndex)+" is not running!!!"); } } @SuppressWarnings("resource") private static void requestStop() throws IOException { - //FIXME currently stop requests all the servers to stop File file = new File(getServerStopFileName()); file.createNewFile(); new RandomAccessFile(file, "rw").getChannel().lock(); file.deleteOnExit(); + if (forcedStop){ + // incase a previous attempt of stopping without forcing is present, best to delete that file + File f=new File((serverIndex==-1)? stopFileNamePrefix:stopFileNamePrefix+serverIndex); + if (f.exists()){ + f.deleteOnExit(); + } + } } private static boolean hasStopRequested(){ - return new File(getServerStopFileName()).exists() || new File(stopFileNamePrefix).exists(); + forcedStop=new File(getServerStopForceFileName()).exists() || new File(stopFileNamePrefixForced).exists(); + return forcedStop || new File(getServerStopFileName()).exists() || new File(stopFileNamePrefix).exists(); } private static String getServerStopFileName() { - return (serverIndex==-1)?stopFileNamePrefix:stopFileNamePrefix+serverIndex; + String filePrefix = forcedStop? stopFileNamePrefixForced : stopFileNamePrefix; + return (serverIndex==-1)? filePrefix:filePrefix+serverIndex; + } + + private static String getServerStopForceFileName() { + return (serverIndex==-1)?stopFileNamePrefixForced:stopFileNamePrefixForced+serverIndex; } private static boolean isServerRunning(){ @@ -177,15 +204,19 @@ public class ServerMain { return serverStartedFileNamePrefix+serverIndex; } - + private static int DEFAULT_FORCE_STOP_WAIT_INTERVAL=3000; public static void stopAllServers() { //stopping should be done in reverse order of starting the servers for(int i=servers.size()-1;i>=0;i--){ try { servers.get(i).stop(); - servers.get(i).waitForServerToStop(); + if (forcedStop) { + waitForServerToStop(servers.get(i),DEFAULT_FORCE_STOP_WAIT_INTERVAL); + }else{ + waitForServerToStop(servers.get(i),null); + } } catch (Exception e) { - e.printStackTrace(); + logger.error("Server Stop Error:",e); } } } @@ -197,11 +228,40 @@ public class ServerMain { for (IServer server : servers) { try { server.start(); - server.waitForServerToStart(); + waitForServerToStart(server,null); } catch (Exception e) { - e.printStackTrace(); + logger.error("Server Start Error:",e); } } } - + private static final int SERVER_STATUS_CHANGE_WAIT_INTERVAL=100; + + public static void waitForServerToStart(IServer server,Integer maxWait) throws Exception { + int count=0; + if (server.getStatus()==ServerStatus.STARTING) { + logger.info("Waiting for " + server.getName() + " to start..."); + } + while(server.getStatus()==ServerStatus.STARTING && (maxWait==null || count<maxWait)){ + Thread.sleep(SERVER_STATUS_CHANGE_WAIT_INTERVAL); + count+=SERVER_STATUS_CHANGE_WAIT_INTERVAL; + } + if (server.getStatus()!=ServerStatus.STARTED){ + throw new Exception("The "+server.getName()+" did not start!!!"); + } + } + + public static void waitForServerToStop(IServer server,Integer maxWait) throws Exception { + int count=0; + if (server.getStatus()==ServerStatus.STOPING) { + logger.info("Waiting for " + server.getName() + " to stop..."); + } + //we are doing hasStopRequested() check because while we are stuck in the loop to stop there could be a forceStop request + while(server.getStatus()==ServerStatus.STOPING && (maxWait==null || count<maxWait) && hasStopRequested() && !forcedStop){ + Thread.sleep(SERVER_STATUS_CHANGE_WAIT_INTERVAL); + count+=SERVER_STATUS_CHANGE_WAIT_INTERVAL; + } + if (server.getStatus()!=ServerStatus.STOPPED){ + throw new Exception("Error stopping the "+server.getName()+"!!!"); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/2a813b80/modules/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/server/src/main/resources/airavata-server.properties b/modules/server/src/main/resources/airavata-server.properties index e0c4b4b..35de79c 100644 --- a/modules/server/src/main/resources/airavata-server.properties +++ b/modules/server/src/main/resources/airavata-server.properties @@ -252,18 +252,15 @@ registry.service.wsdl=http://localhost:${port}/${server.context-root}/services/R # If false, disables two phase commit when submitting jobs TwoPhase=true - ###---------------------------Monitoring module Configurations---------------------------### #This will be the primary monitoring tool which runs in airavata, in future there will be multiple monitoring #mechanisms and one would be able to start a monitor -primaryMonitor=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor -#We do not support a secondaray monitoring at this point or host specific monitoring -secondaryMonitor=org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor +monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor #This is the amqp related configuration and this lists down the Rabbitmq host, this is an xsede specific configuration amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org +proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876 connection.name=xsede_private - ###---------------------------Orchestrator module Configurations---------------------------### job.submitter=org.apache.airavata.orchestrator.core.impl.EmbeddedGFACJobSubmitter job.validator=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator
