Modified: uima/uima-ducc/trunk/src/main/admin/ducc_util.py URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/src/main/admin/ducc_util.py?rev=1838082&r1=1838081&r2=1838082&view=diff ============================================================================== --- uima/uima-ducc/trunk/src/main/admin/ducc_util.py (original) +++ uima/uima-ducc/trunk/src/main/admin/ducc_util.py Wed Aug 15 12:05:15 2018 @@ -80,6 +80,10 @@ def find_ducc_uid(): pwdinfo = pwd.getpwuid(my_uid) return pwdinfo.pw_name +def base_dir(): + fpath = __file__.rsplit('/',2)[0] + return fpath + class ThreadWorker(Thread): def __init__(self, queue, outlock): Thread.__init__(self) @@ -484,7 +488,7 @@ class DuccUtil(DuccBase): hostname = line.split('.')[0] return hostname - def ssh_operational(self, node): + def ssh_operational(self, node, verbosity=True): is_operational = False req = node.split('.')[0] cmd = '/bin/hostname' @@ -500,10 +504,11 @@ class DuccUtil(DuccBase): if(req == rsp): is_operational = True; if(not is_operational): - print 'ssh not operational - unexpected results' - print ssh_cmd - for line in lines: - print line + if(verbosity): + print 'ssh not operational - unexpected results' + print ssh_cmd + for line in lines: + print line return is_operational # like popen, only it spawns via ssh @@ -736,6 +741,14 @@ class DuccUtil(DuccBase): debug('node_name: ', node_name) return node_name + def is_head_node(self): + retVal = False + head_node_list = self.get_head_node_list_variations() + node = self.get_node_name() + if(node in head_node_list): + retVal = True + return retVal + # Exit if this is not the head node. Ignore the domain as uname sometimes drops it. # Also check that ssh to this node works # Also restrict operations to the userid that installed ducc @@ -1053,6 +1066,15 @@ class DuccUtil(DuccBase): DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.head=' + self.ducc_properties.get('ducc.head') self.spawn(self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.common.main.DuccAdmin', '--killAll') + def ducc_admin(self,option,target): + DUCC_JVM_OPTS = ' -Dducc.deploy.configuration=' + self.DUCC_HOME + "/resources/ducc.properties " + DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -DDUCC_HOME=' + self.DUCC_HOME + DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.head=' + self.ducc_properties.get('ducc.head') + java_class = 'org.apache.uima.ducc.common.main.DuccAdmin' + cmd = self.java()+' '+DUCC_JVM_OPTS+' '+java_class+' '+option+' '+target + print cmd + self.spawn(self.java(), DUCC_JVM_OPTS, java_class, option, target) + def get_os_pagesize(self): lines = self.popen('/usr/bin/getconf', 'PAGESIZE') return lines.readline().strip() @@ -1116,9 +1138,14 @@ class DuccUtil(DuccBase): # map. The map is keyed on filename, with each entry a list of the nodes. # Skip file with suffix ".regex". # - def read_nodefile(self, nodefile, ret): + def read_nodefile(self, rnodefile, ret): #print 'READ_NODEFILE:', nodefile, ret n_nodes = 0 + if(rnodefile.startswith('/')): + nodefile = rnodefile + else: + nodefile = os.path.join(base_dir(),'resources',rnodefile) + #print nodefile if(nodefile.endswith('.regex')): pass elif ( os.path.exists(nodefile) ): @@ -1141,6 +1168,7 @@ class DuccUtil(DuccBase): n_nodes = n_nodes + 1 ret[nodefile] = nodes else: + n_nodes = -1 print 'Cannot read nodefile', nodefile ret[nodefile] = None @@ -1301,6 +1329,89 @@ class DuccUtil(DuccBase): def is_reliable_backup(self): return self.get_reliable_state() == 'backup' + def db_normalize_component(self,component): + com = component + if(component != None): + if('agent'.startswith(component.lower())): + com = 'Ag' + elif('orchestrator'.startswith(component.lower())): + com = 'Or' + elif('pm'.startswith(component.lower())): + com = 'Pm' + elif('rm'.startswith(component.lower())): + com = 'Rm' + elif('sm'.startswith(component.lower())): + com = 'Sm' + elif('ws'.startswith(component.lower())): + com = 'Ws' + elif('broker'.startswith(component.lower())): + com = 'Br' + elif('db'.startswith(component.lower())): + com = 'Db' + elif('database'.startswith(component.lower())): + com = 'Db' + #print 'db_normalize_component', component, '-->', com + return com + + def db_acct_start(self,node,com): + label = 'db_acct_start' + component = self.db_normalize_component(com) + CMD = [self.java(), '-DDUCC_HOME='+self.DUCC_HOME, 'org.apache.uima.ducc.database.lifetime.DbDaemonLifetimeUI', '--start', node, component] + text = ' '.join(CMD) + debug(label,text) + p = subprocess.Popen(CMD, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = p.communicate() + text = out + debug(label,text) + + def db_acct_stop(self,node=None,com=None): + label = 'db_acct_stop' + component = self.db_normalize_component(com) + if((node == None) and (component == None)): + CMD = [self.java(), '-DDUCC_HOME='+self.DUCC_HOME, 'org.apache.uima.ducc.database.lifetime.DbDaemonLifetimeUI', '--stop'] + elif(component == None): + CMD = [self.java(), '-DDUCC_HOME='+self.DUCC_HOME, 'org.apache.uima.ducc.database.lifetime.DbDaemonLifetimeUI', '--stop', node] + else: + CMD = [self.java(), '-DDUCC_HOME='+self.DUCC_HOME, 'org.apache.uima.ducc.database.lifetime.DbDaemonLifetimeUI', '--stop', node, component] + text = ' '.join(CMD) + debug(label,text) + p = subprocess.Popen(CMD, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = p.communicate() + text = out + debug(label,text) + + def _db_get_tuple(self,line): + retVal = None + try: + if(len(line.split()) == 1): + lhs = line.split('=')[0] + rhs = line.split('=')[1] + host = lhs.split('.')[0] + daemon = lhs.split('.')[1] + state = rhs + tuple = [ host, daemon, state ] + retVal = tuple + except: + pass + return retVal + + def db_acct_query(self): + list = [] + label = 'db_acct_query' + CMD = [self.java(), '-DDUCC_HOME='+self.DUCC_HOME, 'org.apache.uima.ducc.database.lifetime.DbDaemonLifetimeUI', '--query'] + text = ' '.join(CMD) + debug(label,text) + p = subprocess.Popen(CMD, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = p.communicate() + text = out + debug(label,text) + lines = out.split('\n') + for line in lines: + tuple = self._db_get_tuple(line) + if(tuple != None): + list.append(tuple) + return list + def __init__(self, merge=False): global use_threading DuccBase.__init__(self, merge)
Modified: uima/uima-ducc/trunk/src/main/admin/start_ducc URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/src/main/admin/start_ducc?rev=1838082&r1=1838081&r2=1838082&view=diff ============================================================================== --- uima/uima-ducc/trunk/src/main/admin/start_ducc (original) +++ uima/uima-ducc/trunk/src/main/admin/start_ducc Wed Aug 15 12:05:15 2018 @@ -43,6 +43,9 @@ class StartDucc(DuccUtil): broker_host = self.localhost print 'Starting broker on', broker_host + node = broker_host + com = 'broker' + self.db_acct_start(node,com) lines = self.ssh(broker_host, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', 'broker', "'") while 1: line = lines.readline().strip() @@ -52,7 +55,7 @@ class StartDucc(DuccUtil): if ( line.startswith('PID') ): toks = line.split(' ') # get the PID print "Broker on", broker_host, 'PID', toks[1] - self.pids_daemons.put('broker@' + broker_host, toks[1]) + #self.pids_daemons.put('broker@' + broker_host, toks[1]) lines.close() break @@ -74,7 +77,7 @@ class StartDucc(DuccUtil): com, node = com.split('@') if (com in self.local_components): - node = self.localhost + node = self.localhost if ((com in self.default_components) or ( com == 'agent')) : msgs.append((node, 'Starting', com)) @@ -103,17 +106,19 @@ class StartDucc(DuccUtil): if ( line.startswith('PID') ): toks = line.split(' ') # get the PID msgs.append((' PID', toks[1])) - self.pids_daemons.put(com + '@' + node, toks[1]) + #self.pids_daemons.put(com + '@' + node, toks[1]) lines.close() break if ( line.startswith('WARN') ): msgs.append((' ', line)) - if ( com in self.default_components ): # tracks where the management processes are - self.pidlock.acquire() - self.pids_daemons.put(com, com + '@' + node) - self.pidlock.release() + #if ( com in self.default_components ): # tracks where the management processes are + # self.pidlock.acquire() + # self.pids_daemons.put(com, com + '@' + node) + # self.pidlock.release() + self.db_acct_start(node,com) + return msgs def start_one_agent(self, args): @@ -130,9 +135,9 @@ class StartDucc(DuccUtil): if ( line.startswith('PID') ): toks = line.split(' ') pid = toks[1] - self.pidlock.acquire() - self.pids_agents.put('agent@' + host, pid) - self.pidlock.release() + #self.pidlock.acquire() + #self.pids_agents.put('agent@' + host, pid) + #self.pidlock.release() lines.close() msgs.append((spacer, 'DUCC Agent started PID', pid)) @@ -154,6 +159,8 @@ class StartDucc(DuccUtil): else: msgs.append((spacer, line)) + self.db_acct_start(host,'agent') + return msgs def verify_required_directories(self): @@ -249,11 +256,11 @@ class StartDucc(DuccUtil): nodefiles = [] components = [] or_parms = self.ducc_properties.get('ducc.orchestrator.start.type') - if(not self.is_reliable_backup()): - self.pids_agents = Properties() - self.pids_agents.load_if_exists(self.pid_file_agents) - self.pids_daemons = Properties() - self.pids_daemons.load_if_exists(self.pid_file_daemons) + #if(not self.is_reliable_backup()): + # self.pids_agents = Properties() + # self.pids_agents.load_if_exists(self.pid_file_agents) + #self.pids_daemons = Properties() + #self.pids_daemons.load_if_exists(self.pid_file_daemons) try: opts, args = getopt.getopt(argv, 'c:mn:sh?v', ['component=', 'help', 'nodelist=', 'cold', 'warm', 'nothreading']) @@ -266,15 +273,15 @@ class StartDucc(DuccUtil): for ( o, a ) in opts: if o in ( '-c', '--component' ): if (a.strip() == 'head'): - components.append('or') - components.append('pm') - components.append('rm') - components.append('sm') - components.append('ws') - components.append('db') - components.append('broker') - else: - components.append(a) + components.append('or') + components.append('pm') + components.append('rm') + components.append('sm') + components.append('ws') + components.append('db') + components.append('broker') + else: + components.append(a) elif o in ( '-n', '--nodelist' ): nodefiles.append(a) elif o in ( '--nothreading' ): @@ -366,7 +373,7 @@ class StartDucc(DuccUtil): ducc = Ducc() self.threadpool = ThreadPool(n_nodes + 5) # a few more for the head processes - self.pidlock = threading.Lock() + #self.pidlock = threading.Lock() #start 'or' first to field system log requests if ( len(components) != 0 ): @@ -413,11 +420,11 @@ class StartDucc(DuccUtil): self.threadpool.quit() - if(not self.is_reliable_backup()): - if ( len(self.pids_agents) > 0 ): - self.pids_agents.write(self.pid_file_agents) - if ( len(self.pids_daemons) > 0 ): - self.pids_daemons.write(self.pid_file_daemons) + #if(not self.is_reliable_backup()): + # if ( len(self.pids_agents) > 0 ): + # self.pids_agents.write(self.pid_file_agents) + #if ( len(self.pids_daemons) > 0 ): + # self.pids_daemons.write(self.pid_file_daemons) return if __name__ == "__main__": Modified: uima/uima-ducc/trunk/src/main/admin/stop_ducc URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/src/main/admin/stop_ducc?rev=1838082&r1=1838081&r2=1838082&view=diff ============================================================================== --- uima/uima-ducc/trunk/src/main/admin/stop_ducc (original) +++ uima/uima-ducc/trunk/src/main/admin/stop_ducc Wed Aug 15 12:05:15 2018 @@ -18,369 +18,622 @@ # under the License. # ----------------------------------------------------------------------- +features = [ 'stop_ducc from head node only', + 'support --head which stops non-agent daemons only on local head', + 'support --agents which stops on any stated node', + '--help explains that stop_ducc disables autostart', + 'employ broadcast for --stop and --quiesce of ducc daemons', + 'employ kill -15 for --stop of broker and database', + 'show ssh before and after for kill -15', + ] -import os import sys + +version_min = [2, 7] +version_info = sys.version_info +version_error = False +if(version_info[0] < version_min[0]): + version_error = True +elif(version_info[0] == version_min[0]): + if(version_info[1] < version_min[1]): + version_error = True +if(version_error): + print('Python minimum requirement is version '+str(version_min[0])+'.'+str(version_min[1])) + sys.exit(1) + +import argparse +import datetime +import textwrap import time -import getopt -import glob +import traceback -from ducc_util import DuccUtil -from properties import * -from ducc import Ducc +from threading import BoundedSemaphore +from threading import Lock +from threading import Thread -class StopDucc(DuccUtil): +from argparse import RawDescriptionHelpFormatter - def stop_component(self, component, force): +from ducc_util import DuccUtil - if ( (component == 'broker') and self.automanage_broker ): - print 'Stopping broker' - self.stop_broker() - return - if ( (component == 'db') and self.automanage_database ): - print 'Stopping database' - self.db_stop() - return +# lock for messages +lock_print = Lock() - # - # If it's an unqualified management component, we need to get it's qualified name - # - if ( component in self.default_components ): - if( component == 'agent' ): - if ( self.pids_agents.has_key(component) ): - component = self.pids_agents.get(component) - else: - print 'Skipping', component, 'not in pids file.' - return - else: - if ( self.pids_daemons.has_key(component) ): - component = self.pids_daemons.get(component) - else: - print 'Skipping', component, 'not in pids file.' - return +# print message +def output(msg): + with lock_print: + print msg + +# produce a time stamp +def get_timestamp(): + tod = time.time() + timestamp = datetime.datetime.fromtimestamp(tod).strftime('%Y-%m-%d %H:%M:%S') + return timestamp + +_flag_debug = False + +# record debug message +def debug(mn,text): + if(_flag_debug): + type ='D' + msg = get_timestamp()+' '+type+' '+mn+' '+text + output(msg) - # - # If the name is not qualified we've got a problem, everything in the pids file is qualified - # - if ( component.find('@') >= 0 ): - com, target_node = component.split('@') - else: - self.invalid("Must specify hostname when stopping", component) +class StopDucc(DuccUtil): - # - # If despite all that we can't find the pid, we need to run check_ducc - # - if( com == 'agent' ): - if ( not self.pids_agents.has_key(component) ): - print "Cannot find PID for component", component, ". Run check_ducc -p to refresh PIDS and then rerun stop_ducc." - return - else: - pid = self.pids_agents.get(component) - else: - if ( not self.pids_daemons.has_key(component) ): - print "Cannot find PID for component", component, ". Run check_ducc -p to refresh PIDS and then rerun stop_ducc." - return - else: - pid = self.pids_daemons.get(component) - + def _fn(self): + fpath = __file__.split('/') + flen = len(fpath) + return fpath[flen-1] + + # return class name + def _cn(self): + return self.__class__.__name__ - if ( force ): - print 'Stopping component', com, 'on node', target_node, 'with PID', pid, 'forcibly (kill -9)' - self.nohup(['ssh', target_node, 'kill', '-KILL', pid], False) - - pass - else: - print 'Stopping component', com, 'on node', target_node, 'with PID', pid - self.nohup(['ssh', target_node, 'kill', '-INT', pid], False) - - # clear the short name if it exists, and the long name - if( com == 'agent' ): - self.pids_agents.delete(com) - self.pids_agents.delete(component) - else: - self.pids_daemons.delete(com) - self.pids_daemons.delete(component) - - def quiesce_agents(self, components, nodes): - allnodes = [] - for ( nf, nl ) in nodes.items(): - allnodes = allnodes + nl - - for c in components: - if ( c.find('@') >= 0 ): - com, target_node = c.split('@') - allnodes.append(target_node) - else: - self.invalid("Must specify hostname when stopping", component) - - qparm = ','.join(allnodes) - print 'Quiescing', qparm - DUCC_JVM_OPTS = ' -Dducc.deploy.configuration=' + self.DUCC_HOME + "/resources/ducc.properties " - DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -DDUCC_HOME=' + self.DUCC_HOME - DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.head=' + self.ducc_properties.get('ducc.head') - self.spawn(self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.common.main.DuccAdmin', '--quiesceAgents', qparm) - - # NOTE: quiesce does not actually cause agents to terminate so we don't update the PIDs file - return - - def stop_agent(self, node, force): - self.stop_component('agent@' + node.strip(), force) + # return method name + def _mn(self): + return traceback.extract_stack(None,2)[0][2] - def usage(self, msg): - if ( msg != None ): - print msg - - print 'stop_ducc [options]' - print ' If no options are given, this help screen is shown.' - print '' - print ' For reliable DUCC agents will not be stopped from backup head node. ' - print '' - print ' Broker will not be stopped when ducc.broker.automanage = false. ' - print ' Database will not be stopped when ducc.database.automanage = false. ' - print '' - print 'Options:' - print ' -a --all' - print ' Stop all the DUCC processes, including agents and management processes.' - print '' - print ' -n --nodelist nodefile' - print ' Stop agents on the nodes in the nodefile. Multiple nodefiles may be specified:' - print '' - print ' stop_ducc -n foo.nodes -n bar.nodes -n baz.nodes' - print '' - print ' -c --component component' - print ' Stop a specific component. The component may be qualified with the node name' - print ' using the @ symbol: component@node.' - print '' - print ' stop_ducc -c rm@foonode' - print ' stop_ducc -c agent@barnode -c or' - print '' - print ' Components include:' - print ' agent - node agent' - print ' broker - AMQ broker' - print ' db - database' - print ' or - orchestrator' - print ' pm - process manager' - print ' rm - resource manager' - print ' sm - services manager' - print ' ws - web server' - print ' head = { or, pm, rm, sm, ws, db, broker }' - print '' - print ' -w --wait' - print ' Time to wait for everything to come down, in seconds. Default is 60.' - print '' - print ' -k --kill' - print ' Stop the component forcibly and immediately using kill -9. Use this only if a' - print ' normal stop does not work (e.g. the process may be hung).' - print '' - print ' --nothreading' - print ' Disable multithreaded operation if it would otherwise be used' - print '' - + c_agent = 'agent' + c_ag = 'ag' + c_broker = 'broker' + c_br = 'br' + c_database = 'database' + c_db = 'db' + c_orchestrator = 'orchestrator' + c_or = 'or' + c_pm = 'pm' + c_rm = 'rm' + c_sm = 'sm' + c_ws = 'ws' + + n_ag = 'ag' + n_br = 'br' + n_db = 'db' + n_or = 'or' + n_pm = 'pm' + n_rm = 'rm' + n_sm = 'sm' + n_ws = 'ws' + + components = [ c_agent, c_pm, c_rm, c_sm, c_or, c_ws, c_broker, c_database, ] + + shortname = { c_agent:n_ag, + c_broker:n_br, + c_database:n_db, + c_or:n_or, + c_pm:n_pm, + c_rm:n_rm, + c_sm:n_sm, + c_ws:n_ws, + c_ag:n_ag, + c_br:n_br, + c_db:n_db, + c_orchestrator:n_or, + } + + longname = { n_ag:c_agent, + n_br:c_broker, + n_db:c_database, + n_or:c_orchestrator, + n_pm:c_pm, + n_rm:c_rm, + n_sm:c_sm, + n_ws:c_ws, + } + + option_agents = '--agents' + option_all = '--all' + option_component = '--component' + option_debug = '--debug' + option_head = '--head' + option_kill = '--kill' + option_maxthreads = '--maxthreads' + option_nodelist = '--nodelist' + option_stop = '--stop' + option_quiesce = '--quiesce-then-stop' + + cmd_kill_9 = 'kill -9' + cmd_kill_15 = 'kill -15' + cmd_ssh = 'ssh' + cmd_start_ducc = 'start_ducc' + + kw_DUCC = 'DUCC' + + maxthreads = 10 + default_stop = 60 + + sig15 = 15 + sig9 = 9 + + def _exit(self): sys.exit(1) - def invalid(self, *msg): - if ( msg[0] != None ): - print ' '.join(msg) - - print "For usage run" - print " stop_ducc -h" - print 'or' - print ' stop_ducc --help' - sys.exit(1) + def _help(self): + self.parser.print_help() + self._exit + def get_epilog(self): + epilog = '' + epilog = epilog+'Notes:' + epilog = epilog+'\n' + epilog = epilog+'N1. '+self._fn()+' '+'is limited to running on a head node.' + epilog = epilog+'\n' + epilog = epilog+'N2. '+self._fn()+' '+'updates database autostart table with "stop" status.' + epilog = epilog+'\n' + epilog = epilog+'N3. '+self._fn()+' '+self.option_kill+' option employs '+self.cmd_ssh+' with '+self.cmd_kill_9+'.' + epilog = epilog+'\n' + epilog = epilog+'N4. '+self._fn()+' '+self.option_stop+' and '+self.option_quiesce+' options employ broadcast via broker.'\ + +'\n'\ + +' '+'The broker and database are exceptions, whereby '+self.cmd_ssh+' with '+self.cmd_kill_15+' is employed.' + epilog = epilog+'\n\n' + epilog = epilog+'Examples:' + epilog = epilog+'\n\n' + epilog = epilog+'E1. kill all daemons that were started, as recorded in the database autostart table' + epilog = epilog+'\n' + epilog = epilog+'> '+self._fn()+' '+self.option_all+' '+self.option_kill + epilog = epilog+'\n\n' + epilog = epilog+'E2. stop all head node daemons on the present node' + epilog = epilog+'\n' + epilog = epilog+'> '+self._fn()+' '+self.option_head+' '+self.option_stop + epilog = epilog+'\n\n' + epilog = epilog+'E3. stop all agents via broadcast, each will issue '+self.cmd_kill_15+' to children'\ + +'\n'\ + +' '+'then exit after a maximum of '+str(self.default_stop)+' seconds, by default' + epilog = epilog+'\n' + epilog = epilog+'> '+self._fn()+' '+self.option_agents+' '+self.option_stop + epilog = epilog+'\n\n' + epilog = epilog+'E4. quiesce all agents, each will issue '+self.cmd_kill_15+' to children then exit only'\ + +'\n'\ + +' '+'after all children have exited' + epilog = epilog+'\n' + epilog = epilog+'> '+self._fn()+' '+self.option_agents+' '+self.option_quiesce + epilog = epilog+'\n\n' + epilog = epilog+'E5. quiesce agents listed in groupA.nodes and groupB.nodes, each will issue '+self.cmd_kill_15\ + +'\n'\ + +' '+'to children then exit only after all children have exited' + epilog = epilog+'\n' + epilog = epilog+'> '+self._fn()+' '+self.option_nodelist+' groupA.nodes '+self.option_nodelist+' groupB.nodes '+self.option_quiesce + epilog = epilog+'\n\n' + epilog = epilog+'E6. stop agents on nodes nodeC8 and nodeD5, each will issue '+self.cmd_kill_15\ + +'\n'\ + +' '+'to children then exit after a maximum of '+str(90)+' seconds.' + epilog = epilog+'\n' + epilog = epilog+'> '+self._fn()+' '+self.option_component+' '+self.c_agent+'@nodeC8 '+self.option_component+' '+self.c_agent+'@nodeD5 '+self.option_stop+' '+str(90) + epilog = epilog+'\n\n' + epilog = epilog+'E7. stop orchestrator' + epilog = epilog+'\n' + epilog = epilog+'> '+self._fn()+' '+self.option_component+' '+self.c_or+' '+self.option_stop + epilog = epilog+'\n\n' + epilog = epilog+'E8. kill orchestrator on alternate head node nodeX3' + epilog = epilog+'\n' + epilog = epilog+'> '+self._fn()+' '+self.option_component+' '+self.c_or+'@nodeX3'+' '+self.option_kill + return epilog - def main(self, argv): - - self.verify_head() - - self.check_properties() - - if ( len(argv) == 0 ): - self.usage(None) - - components = [] - nodefiles = [] - do_agents = False - do_components = False - force = False - quiesce = False - all = False - wait_time = 60 - - try: - opts, args = getopt.getopt(argv, 'ac:n:kn:w:qh?v', ['all', 'component=', 'help', 'nodelist=', 'kill', 'quiesce', 'nothreading', 'wait']) - except: - self.invalid('Invalid arguments ' + ' '.join(argv)) - - if (len(args) > 0): - self.invalid('Invalid extra args: ', ' '.join(args)) - - for ( o, a ) in opts: - if o in ('-c', '--component' ): - if (a.strip() == 'head'): - components.append('or') - components.append('pm') - components.append('rm') - components.append('sm') - components.append('ws') - components.append('db') - components.append('broker') - else: - components.append(a) - do_components = True - elif o in ( '-a', '--all' ): - all = True - components = self.default_components - elif o in ( '-n', '--nodelist' ): - nodefiles.append(a) - do_agents = True - elif o in ( '-k', '--kill' ): - force = True - elif o in ( '-q', '--quiesce' ): - quiesce = True - elif o in ( '-w', '--wait' ): - wait_time = int(a) - elif o in ( '--nothreading' ): - self.disable_threading() - elif ( o == '-v' ) : - print self.version() - sys.exit(0) - elif o in ( '-h', '--help' ): - self.usage(None) - elif ( o == '-?'): - self.usage(None) - else: - self.invalid('bad arg: ' + o) - - if ( quiesce ): - if ( all ): - self.invalid("May not quiesce 'all'."); - if ( force ): - self.invalid("May not both quiesce and force."); - for c in components: - if ( not c.startswith('agent') ): - self.invalid("Only agents may be quiesced.") - - - - # avoid confusion by insuring that if 'all', then nothing else is specified - if ( all and ( do_components ) ): - self.invalid("The --all option is mutually exclusive with --component") - - # 'all' means everything. we use broadcast. should use check_ducc to make sure - # it actually worked, and find the stragglers. - if ( all ): - if ( not force ) : - self.clean_shutdown() - - # Agents may wait up to 60 secs for processes to quiesce - print "Waiting " + str(wait_time) + " seconds to broadcast agent shutdown." - time.sleep(wait_time) - - if ( self.automanage_broker ): - print "Stopping broker" - self.stop_broker() - - if ( self.automanage_database ): - print "Stopping database" - self.db_stop() - - if ( os.path.exists(self.pid_file_agents) ): - os.remove(self.pid_file_agents) - if ( os.path.exists(self.pid_file_daemons) ): - os.remove(self.pid_file_daemons) - return + help_all = 'Stop all DUCC management and agent processes by using database entries recorded by start_ducc. Only allowed if '+option_kill+' option is also specified.' + help_head = 'Stop the DUCC management processes on the present head node by using database entries recorded by start_ducc.' + help_agents = 'Stop the DUCC agents processes on all nodes by using database entries recorded by '+cmd_start_ducc+'.' + help_nodelist = 'Stop agents on the nodes in the nodefile. Multiple nodefiles may be specified.' + help_component = 'Stop a specific component. The component may be qualified with the node name using the @ symbol: component@node.'\ + + ' If node is not specified, then the localhost is presumed. Multiple components may be specified. components = '+str(components)+'.'\ + + ' Specification of a head node component other than on the present head node is disallowed unless '+option_kill+' option is also specified.'\ + + ' Specification of broker or database is disallowed unless that component is automanaged by '+kw_DUCC+'.' + help_kill = 'Stop the component(s) forcibly and immediately using '+cmd_ssh+' with '+cmd_kill_9+'. Use this only if a normal stop does not work (e.g. the process may be hung).' + help_stop = 'Stop the component(s) gracefully using broadcast. Agents allow children specified time (in seconds) to exit. Default is '+str(default_stop)+'.'\ + + ' Broadcast is not used for broker and database, instead a direct kill -15 is employed.' + help_quiesce = 'Stop the component(s) gracefully using broadcast. Agents exit only when no children exist. Children are given infinite time to exit.' + help_maxthreads = 'Maximum concurrent threads. Default = '+str(maxthreads)+'.' + help_debug = 'Display debugging messages.' + + # get user specified command line args + def get_args(self): + self.parser = argparse.ArgumentParser(formatter_class=RawDescriptionHelpFormatter,epilog=self.get_epilog()) + group1 = self.parser.add_mutually_exclusive_group(required=True) + group1.add_argument(self.option_all, action='store_true', help=self.help_all) + group1.add_argument(self.option_head, action='store_true', help=self.help_head) + group1.add_argument(self.option_agents, action='store_true', help=self.help_agents) + group1.add_argument(self.option_nodelist, '-n', action='append', help=self.help_nodelist) + group1.add_argument(self.option_component, '-c', action='append', help=self.help_component) + group2 = self.parser.add_mutually_exclusive_group(required=True) + group2.add_argument(self.option_kill, '-k', action='store_true', help=self.help_kill) + group2.add_argument(self.option_stop, '-s', action='store', type=int, nargs='?', const=self.default_stop, help=self.help_stop) + group2.add_argument(self.option_quiesce, '-q', action='store_true', help=self.help_quiesce) + self.parser.add_argument(self.option_maxthreads, '-m', action='store', type=int, default=None, help=self.help_maxthreads) + self.parser.add_argument(self.option_debug, '-d', action='store_true', help=self.help_debug) + self.args = self.parser.parse_args() + # sepcial cases + if(self.args.kill): + if(self.args.maxthreads == None): + self.args.maxthreads = self.maxthreads + elif(self.args.stop): + if(self.args.maxthreads == None): + self.args.maxthreads = 2 else: - if ( len(nodefiles) == 0 ): - nodefiles = self.default_nodefiles - - - self.pids_agents = Properties() - self.pids_daemons = Properties() - sc = set(components) - sb = set(['broker', 'db']) - read_pids = True - if ( sc.issubset(sb) ): - read_pids = False - - # The broker and db do not set the pid file - if ( read_pids ): - try: - if(not self.is_reliable_backup()): - self.pids_agents.load(self.pid_file_agents) - self.pids_daemons.load(self.pid_file_daemons) - except PropertiesException, (inst): - print inst.msg - print '' - print 'Run check_ducc -p to refresh the PIDs file, or check_ducc -k to search for and', - print 'kill all DUCC processes.' - print '' - sys.exit(1) - - # - # if not 'all', we use nodefiles and component names - # - - # make sure all the nodefiles exist and are readable - ok = True - nodes = {} - n_nodes = 0 - for n in nodefiles: - n_nodes, nodes = self.read_nodefile(n, nodes) - - for ( nf, nl ) in nodes.items(): - if ( nl == None ): # die early if the parameters are wrong - print "Can't read nodefile", nf - ok = False - - if ( not ok ): - sys.exit(1) - - if ( quiesce ): - if(self.is_reliable_backup()): - print '********** "backup" head node -> not quiescing agents' + self.parser.error(self.option_maxthreads+' requires '+self.option_kill) + elif(self.args.maxthreads != None): + self.parser.error(self.option_maxthreads+' requires '+self.option_kill) + # debug + if(self.args.debug): + global _flag_debug + _flag_debug = True + text = str(self.args) + debug(self._mn(),text) + + db_list = None + + # fetch and cache list of tuples comprising + # daemon@node from database autostart table + def get_db_list(self): + if(self.db_list == None): + self.db_list = self.db_acct_query() + return self.db_list + + # --all + def all(self): + text = str(self.args.all) + debug(self._mn(),text) + # get list of tuples from DB: + # [ host, component, state ] + list = self.get_db_list() + return list + + # --head + def head(self): + text = str(self.args.head) + debug(self._mn(),text) + # get list of tuples from DB: + # [ host, component, state ] + db_list = self.get_db_list() + list = [] + this_node = self.get_node_name() + for item in db_list: + node = item[0] + component = item[1] + if(component == self.n_ag): + continue + if(node != this_node): + continue + list.append(item) + return list + + # --agents + def agents(self): + text = str(self.args.agents) + debug(self._mn(),text) + # get list of tuples from DB: + # [ host, component, state ] + db_list = self.get_db_list() + list = [] + for item in db_list: + node = item[0] + component = item[1] + if(component == self.n_ag): + list.append(item) + text = 'add: '+'node:'+node+' '+'component:'+component + debug(self._mn(),text) else: - self.quiesce_agents(components, nodes) - else: - if(self.is_reliable_backup()): - print '********** "backup" head node -> not stopping agents' + text = 'skip: '+'node:'+node+' '+'component:'+component + debug(self._mn(),text) + return list + + # --nodelist + def nodelist(self): + text = str(self.args.nodelist) + debug(self._mn(),text) + component = 'ag' + state = '' + list = [] + map = {} + # fetch map where key is nodefile filename + # and value is list of nodes + for nodefile in self.args.nodelist: + nodes, map = self.read_nodefile(nodefile,map) + if(nodes < 0): + self._exit() + # create list of tuples from nodelist file(s): + # [ host, component, state ] + for key in map: + nodes = map[key] + for node in nodes: + entry = [ node, component, state ] + list.append(entry) + return list + + # --component + def complist(self): + text = str(self.args.component) + debug(self._mn(),text) + list = [] + # validate components specified on cmdline + for c in self.args.component: + parts = c.split('@') + if len(parts) == 1: + dn = self.get_node_name() + dc = parts[0] + elif len(parts) == 2: + dn = parts[1] + dc = parts[0] + if(dc.startswith('all')): + msg = 'node specification disallowed for: '+dc + output(msg) + self._exit() else: - for (nf, nl) in nodes.items(): - for n in nl: - self.stop_agent(n, force) - host = self.localhost.split('.')[0] - for c in components: - c = c.strip() - if(c in ('pm','rm','sm','ws')): - c = c+'@'+host - self.stop_component(c, force) - time.sleep(2) - for c in components: - c = c.strip() - if(c in ('or')): - c = c+'@'+host - self.stop_component(c, force) - time.sleep(2) - for c in components: - c = c.strip() - if(c in ('db','broker')): - self.stop_component(c, force) - - if ( read_pids ): - if(not self.is_reliable_backup()): - if ( len(self.pids_agents) > 0 ): - self.pids_agents.write(self.pid_file_agents) + msg = 'invalid component: '+c + output(msg) + self._exit() + if(not dc in self.components): + msg = 'invalid component: '+c + output(msg) + self._exit() + text = dc+'.'+dn + debug(self._mn(),text) + node = dn + component = self.shortname[dc] + entry = [ node, component, '' ] + list.append(entry) + return list + + # disallow br/db unless automanaged + def validate_automanage(self,component): + if(component == 'br'): + if(not self.automanage_broker): + msg = 'component='+component+' '+'not automanaged.' + output(msg) + self._exit() + elif(component == 'db'): + if(not self.automanage_database): + msg = 'component='+component+' '+'not automanaged.' + output(msg) + self._exit() + + # disallow unless in db + def validate_db(self,node,component): + list = self.get_db_list() + for item in list: + db_node = item[0] + db_component = item[1] + if(db_node == node): + if(db_component == component): + return + msg = 'node='+node+' '+'component='+component+' not found in database autostart table' + output(msg) + self._exit() + + # validate user specified list + def validate_list(self,list): + for item in list: + node = item[0] + component = item[1] + self.validate_automanage(component) + self.validate_db(node, component) + + # in: tuples of (component,pid,user) and a desired component + # out: the pid of the desired component, if found + def find_pid(self,tuples,component): + pid = None + for tuple in tuples: + t_component = tuple[0] + t_pid = tuple[1] + t_user = tuple[2] + if(t_user == self.ducc_uid): + if(self.shortname.has_key(t_component)): + t_comp = self.shortname[t_component] + if(t_comp == component): + pid = t_pid + break + return pid + + # target=kill + def kill(self,count,tid,node,component,signal): + self.db_acct_stop(node,component) + verbosity=False + ssh = self.ssh_operational(node,verbosity) + state = 'state=pending' + pfx = 'kill'+' '+'daemon='+str(count)+' '+'thread='+str(tid)+' '+'node='+node+' '+'component='+component+' ' + msg = pfx+state + output(msg) + if(ssh): + process='' + state='state=success' + status, tuples = self.find_ducc_process(node) + if(status): + pid = self.find_pid(tuples,component) + if(pid == None): + state='state=component not found' else: - os.remove(self.pid_file_agents) - if ( len(self.pids_daemons) > 0 ): - self.pids_daemons.write(self.pid_file_daemons) + self.ssh(node, True, 'kill', '-'+str(signal), str(pid)) + process='pid='+str(pid)+' ' else: - os.remove(self.pid_file_daemons) - - return - -if __name__ == "__main__": - stopper = StopDucc() - stopper.main(sys.argv[1:]) - + state='state=find DUCC process failed' + else: + state = 'state=ssh failure' + msg = pfx+process+state + output(msg) + self.put_tid(tid) + self.pool.release() + + # launch threads to perform kills + def kill_threads(self,list): + size = len(list) + msg = 'daemons='+str(len(list)) + output(msg) + count = 0 + for raw_type in self.components: + type = self.shortname[raw_type] + for item in list: + node = item[0] + component = item[1] + if(component == type): + count = count+1 + self.pool.acquire() + tid = self.get_tid() + signal = self.sig9 + t = Thread(target=self.kill, args=(count,tid,node,component,signal)) + t.start() + + # target=stop + def stop(self,list): + text = 'list='+str(list) + debug(self._mn(),text) + # validate + self.head_on_node_only(list) + # update database + build admin string + self.threads_prep() + admin = '' + stop_db = False + stop_broker = False + for item in list: + node = item[0] + com = item[1] + self.db_acct_stop(node,com) + component = self.longname[com] + if(component == self.c_broker): + stop_broker = True + continue + elif(com == self.c_database): + stop_db = True + continue + else: + component = self.longname[com] + admin = admin+component+'@'+node+' ' + self.db_acct_stop(node,component) + # issue command + admin = admin.strip() + if(len(admin) > 0): + admin = str(self.args.stop)+' '+admin + print "stop: "+admin + self.ducc_admin('--stop',admin) + # stop broker + if(stop_broker): + self.stop_broker() + # stop database + if(stop_db): + self.db_stop() + # target=quiesce + def quiesce(self,list): + text = 'list='+str(list) + debug(self._mn(),text) + # validate + self.agent_only(list) + # update database + build admin string + admin = '' + for item in list: + node = item[0] + com = item[1] + self.db_acct_stop(node,com) + component = self.longname[com] + admin = admin+component+'@'+node+' ' + # issue command + admin = admin.strip() + if(len(admin) > 0): + print "quiesce: "+admin + self.ducc_admin('--quiesce',admin) + + # only head node component on present node allowed + def head_on_node_only(self,list): + head = self.get_node_name() + for item in list: + component = item[1] + if(component != self.n_ag): + node = item[0] + if(node != head): + 'invalid node='+node+' for component='+component + self._exit() + + # only agent component allowed + def agent_only(self,list): + for item in list: + component = item[1] + if(component != self.n_ag): + 'invalid component='+component + self._exit() + + # if this command is not running from a head node, + # then complain and exit + def enforce_location_limits(self,list): + if(not self.is_head_node()): + msg = 'cannot run from non-head node.' + output(msg) + self._exit() + + # multi-thread lock to obtain thread id + lock_tid = Lock() + + # get thread id + def get_tid(self): + with self.lock_tid: + tid = self.tids.pop(0) + return tid + + # return thread id + def put_tid(self,tid): + with self.lock_tid: + self.tids.append(tid) + + # initialize for threading + def threads_prep(self): + maxthreads = self.args.maxthreads + self.tids = range(1,maxthreads+1) + self.pool = BoundedSemaphore(value=maxthreads) + + # main + def main(self,argv): + self.get_args() + # get list of nodes+daemons + if(self.args.all): + if(self.args.kill): + list = self.all() + else: + msg = 'cannot specify '+self.option_all+' unless '+self.option_kill+' is also specified.' + output(msg) + self._exit() + elif(self.args.head): + list = self.head() + elif(self.args.agents): + list = self.agents() + elif(self.args.nodelist != None): + list = self.nodelist() + elif(self.args.component != None): + list = self.complist() + else: + self._help() + text = str(list) + debug(self._mn(),text) + # disallow br/db unless DUCC managed + self.validate_list(list) + # allow only from head node, except for stop of local agent + self.enforce_location_limits(list) + # perform action + if(self.args.kill): + self.threads_prep() + self.kill_threads(list) + elif(self.args.stop != None): + self.stop(list) + elif(self.args.quiesce_then_stop): + self.quiesce(list) + else: + self._help() + +if __name__ == '__main__': + instance = StopDucc() + instance.main(sys.argv[1:]) Added: uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetime.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetime.java?rev=1838082&view=auto ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetime.java (added) +++ uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetime.java Wed Aug 15 12:05:15 2018 @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ + +/** + * Class to maintain and query database table of "autostart" daemons. + * + * > use --start to add host + daemon name to table. + * > use --stop to remove host + daemon name from table. + * > use --query to determine if host + daemon name resides in table. + * + */ + +package org.apache.uima.ducc.database.lifetime; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.log4j.Level; +import org.apache.uima.ducc.common.db.DbHelper; +import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.common.utils.id.DuccId; +import org.apache.uima.ducc.database.DbHandle; +import org.apache.uima.ducc.database.DbManager; +import org.apache.uima.ducc.database.DbUtil; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.exceptions.InvalidQueryException; + +/* + * Class to manage table of DUCC daemons & desired state with respect to delete, query, quiesce, start, and stop. + */ +public class DbDaemonLifetime implements IDbDaemonLifetime { + + private DuccLogger logger = null; + private DuccId id = null; + + public static final Integer RC_Success = DbDaemonLifetimeCommon.RC_Success; + + private static DbManager dbManager = null; + + private static String KEYSPACE = "DUCC"; + private static String DAEMON_LIFETIME_TABLE = DaemonLifetimeProperties.TABLE_NAME.pname(); + + private static String key_host = DaemonLifetimeProperties.host.name(); + private static String key_daemon = DaemonLifetimeProperties.daemon.name(); + private static String key_state = DaemonLifetimeProperties.state.name(); + private static String key_tod = DaemonLifetimeProperties.tod.name(); + + public DbDaemonLifetime() { + logger = DuccLogger.getLogger(DbDaemonLifetime.class); + init(); + } + + public DbDaemonLifetime(Level level) { + logger = DuccLogger.getLogger(DbDaemonLifetime.class); + logger.setLevel(level); + init(); + } + + public DbDaemonLifetime(DuccLogger ducclogger) { + logger = ducclogger; + init(); + } + + private List<SimpleStatement> db_mkSchema() throws Exception { + List<SimpleStatement> ret = new ArrayList<SimpleStatement>(); + StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + DAEMON_LIFETIME_TABLE + " ("); + buf.append(DbUtil.mkSchema(DaemonLifetimeProperties.values())); + buf.append(")"); + ret.add(new SimpleStatement(buf.toString())); + List<String> indexes = DbUtil.mkIndices(DaemonLifetimeProperties.values(), DAEMON_LIFETIME_TABLE); + for (String s : indexes) { + ret.add(new SimpleStatement(s)); + } + return ret; + } + + private boolean db_init() { + String mn = "db_init"; + boolean retVal = false; + try { + List<SimpleStatement>specificationsSchema = db_mkSchema(); + DbHandle h = dbManager.open(); + for ( SimpleStatement s : specificationsSchema ) { + logger.debug(mn, id, "EXECUTE STATEMENT:"+s.toString()); + h.execute(s); + } + retVal = true; + } + catch(Exception e) { + logger.error(mn, id, e); + } + return retVal; + } + + private boolean init(String[] dburls) throws Exception { + String mn = "init"; + boolean retVal = false; + try { + dbManager = new DbManager(dburls, logger); + dbManager.init(); + retVal = true; + } catch (Exception e) { + logger.error(mn, id, "Errors contacting database. No connetion made."); + logger.error(mn, id, e); + } + return retVal; + } + + private boolean init() { + String mn = "init"; + boolean retVal = false; + try { + String[] dbUrls = DbHelper.getHostList(); + StringBuffer sb = new StringBuffer(); + for(String dbUrl : dbUrls) { + sb.append(dbUrl+" "); + } + logger.debug(mn, id, dbUrls.length+" "+sb.toString()); + retVal = init(dbUrls); + } + catch(Exception e) { + logger.error(mn, id, e); + } + return retVal; + } + + private void shutdown() { + dbManager.shutdown(); + } + + private DbResult db_upsert(String host, String daemon, String state, Long tod) { + String mn = "db_upsert"; + DbResult dbResult = new DbResult(); + try { + DbHandle h = dbManager.open(); + PreparedStatement ps = h.prepare("INSERT INTO " + DAEMON_LIFETIME_TABLE + " (host, daemon, state, tod) values(?, ?, ?, ?);"); + logger.debug(mn, id, "EXECUTE STATEMENT:"+ps.toString()+" ("+host+","+daemon+","+state+","+tod+")"); + logger.debug(mn, id, host, daemon, state, tod); + ResultSet rs = h.execute(ps, host, daemon, state, tod); + for ( Row row : rs ) { + int width = row.getColumnDefinitions().size(); + if(width > 1) { + StringBuffer sb = new StringBuffer(); + sb.append(key_host+"="+row.getString(key_host)); + sb.append(" "); + sb.append(key_daemon+"="+row.getString(key_daemon)); + sb.append(" "); + sb.append(key_state+"="+row.getString(key_state)); + sb.append(" "); + sb.append(key_tod+"="+row.getLong(key_tod)); + logger.debug(mn, id, sb.toString()); + } + } + dbResult.rc = RC_Success; + } + catch(Exception e) { + logger.error(mn, id, e); + } + logger.debug(mn, id, dbResult.rc); + return dbResult; + } + + private DbResult db_update(String host, String daemon, String state, Long tod) { + String mn = "db_update"; + DbResult dbResult = new DbResult(); + try { + DbHandle h = dbManager.open(); + PreparedStatement ps; + ResultSet rs; + if(host.equals("*")) { + ps = h.prepare("UPDATE " + DAEMON_LIFETIME_TABLE + " SET state=?, tod=? WHERE daemon=? IF EXISTS;"); + logger.debug(mn, id, "EXECUTE STATEMENT:"+ps.toString()+" ("+state+","+tod+","+daemon+")"); + logger.debug(mn, id, daemon, state, tod); + rs = h.execute(ps, state, tod, daemon); + } + else { + ps = h.prepare("UPDATE " + DAEMON_LIFETIME_TABLE + " SET state=?, tod=? WHERE host=? AND daemon=? IF EXISTS;"); + logger.debug(mn, id, "EXECUTE STATEMENT:"+ps.toString()+" ("+state+","+tod+","+host+","+daemon+")"); + logger.debug(mn, id, host, daemon, state, tod); + rs = h.execute(ps, state, tod, host, daemon); + } + for ( Row row : rs ) { + int width = row.getColumnDefinitions().size(); + if(width > 1) { + StringBuffer sb = new StringBuffer(); + sb.append(key_host+"="+row.getString(key_host)); + sb.append(" "); + sb.append(key_daemon+"="+row.getString(key_daemon)); + sb.append(" "); + sb.append(key_state+"="+row.getString(key_state)); + sb.append(" "); + sb.append(key_tod+"="+row.getLong(key_tod)); + logger.debug(mn, id, sb.toString()); + System.out.println(sb.toString()); + } + } + dbResult.rc = RC_Success; + } + catch(Exception e) { + logger.error(mn, id, e); + e.printStackTrace(); + } + logger.debug(mn, id, dbResult.rc); + return dbResult; + } + + private DbResult db_delete(String host, String daemon) { + String mn = "db_delete"; + DbResult dbResult = new DbResult(); + try { + String w_host = key_host+"="+"'"+host+"'"; + String w_daemon = key_daemon+"="+"'"+daemon+"'"; + String w_clause = "WHERE "+w_host+" AND "+w_daemon; + String cql = "DELETE FROM "+KEYSPACE+"."+DAEMON_LIFETIME_TABLE+" "+w_clause+";"; + logger.debug(mn, id, "EXECUTE STATEMENT:"+cql); + DbHandle h = dbManager.open(); + ResultSet rs = h.execute(cql); + for ( Row row : rs ) { + int width = row.getColumnDefinitions().size(); + if(width > 1) { + StringBuffer sb = new StringBuffer(); + sb.append(key_host+"="+row.getString(key_host)); + sb.append(" "); + sb.append(key_daemon+"="+row.getString(key_daemon)); + sb.append(" "); + sb.append(key_tod+"="+row.getLong(key_tod)); + logger.debug(mn, id, sb.toString()); + } + } + dbResult.rc = RC_Success; + } + catch(InvalidQueryException e) { + if(e.getMessage().equals("unconfigured table daemonlifetime")) { + logger.debug(mn, id, e); + } + else { + logger.error(mn, id, e); + } + } + catch(Exception e) { + logger.error(mn, id, e); + } + logger.debug(mn, id, dbResult.rc); + return dbResult; + } + + private void add_kw_where(StringBuffer sb, String host, String daemon) { + if(host != null) { + sb.append("WHERE"); + sb.append(" "); + } + else if(daemon != null) { + sb.append("WHERE"); + sb.append(" "); + } + } + + private void add_host(StringBuffer sb, String host) { + if(host != null) { + sb.append("host = "); + sb.append("'"); + sb.append(host); + sb.append("'"); + sb.append(" "); + } + } + + private void add_kw_and(StringBuffer sb, String host, String daemon) { + if(host != null) { + if(daemon != null) { + sb.append("AND"); + sb.append(" "); + } + } + } + + private void add_daemon(StringBuffer sb, String daemon) { + if(daemon != null) { + sb.append("daemon = "); + sb.append("'"); + sb.append(daemon); + sb.append("'"); + sb.append(" "); + } + } + + private String get_where_clause(String host, String daemon) { + StringBuffer where_clause = new StringBuffer(); + add_kw_where(where_clause,host,daemon); + add_host(where_clause,host); + add_kw_and(where_clause,host,daemon); + add_daemon(where_clause,daemon); + String retVal = where_clause.toString().trim(); + return retVal; + } + + private DbResult db_query(String host, String daemon) { + String mn = "db_query"; + DbResult dbResult = new DbResult(); + List<DaemonInfo> list = new ArrayList<DaemonInfo>(); + try { + String where_clause = get_where_clause(host,daemon); + String cql = "SELECT * FROM "+KEYSPACE+"."+DAEMON_LIFETIME_TABLE+" "+where_clause+";"; + logger.debug(mn, id, cql); + DbHandle h = dbManager.open(); + ResultSet rs = h.execute(cql); + Iterator<Row> iterator = rs.iterator(); + int rows = 0; + while ( iterator.hasNext()) { + rows = rows + 1; + Row row = iterator.next(); + int width = row.getColumnDefinitions().size(); + if(width > 1) { + DaemonInfo daemonInfo = new DaemonInfo( + row.getString(key_host), + row.getString(key_daemon), + row.getString(key_state), + row.getLong(key_tod) + ); + list.add(daemonInfo); + StringBuffer sb = new StringBuffer(); + sb.append(key_host+"="+daemonInfo.host); + sb.append(" "); + sb.append(key_daemon+"="+daemonInfo.daemon); + sb.append(" "); + sb.append(key_state+"="+daemonInfo.state); + sb.append(" "); + sb.append(key_tod+"="+daemonInfo.tod); + logger.debug(mn, id, sb.toString()); + } + } + dbResult.list = list; + dbResult.rc = RC_Success; + } + catch(InvalidQueryException e) { + if(e.getMessage().equals("unconfigured table daemonlifetime")) { + logger.debug(mn, id, e); + } + else { + logger.error(mn, id, e); + } + } + catch(Exception e) { + logger.error(mn, id, e); + } + logger.debug(mn, id, dbResult.rc); + return dbResult; + } + + private DbResult db_quiesce(String host, String daemon, Long tod) { + String mn = "db_quiesce"; + DbResult dbResult = new DbResult(); + if(init()) { + if(db_init()) { + dbResult = db_update(host, daemon, DesiredState.Quiesce.name(), tod); + } + shutdown(); + } + logger.debug(mn, id, dbResult.rc); + return dbResult; + } + + private DbResult db_start(String host, String daemon, Long tod) { + String mn = "db_start"; + DbResult dbResult = new DbResult(); + if(init()) { + if(db_init()) { + dbResult = db_upsert(host, daemon, DesiredState.Start.name(), tod); + } + shutdown(); + } + logger.debug(mn, id, dbResult.rc); + return dbResult; + } + + private DbResult db_stop(String host, String daemon, Long tod) { + String mn = "db_stop"; + DbResult dbResult = new DbResult(); + if(init()) { + if(db_init()) { + dbResult = db_update(host, daemon, DesiredState.Stop.name(), tod); + } + shutdown(); + } + logger.debug(mn, id, dbResult.rc); + return dbResult; + } + + // + + @Override + public DbResult delete(String host, String daemon) { + DbResult dbResult = new DbResult(); + if(init()) { + dbResult = db_delete(host, daemon); + shutdown(); + } + return dbResult; + } + + @Override + public DbResult query(String host, String daemon) { + DbResult dbResult = new DbResult(); + if(init()) { + dbResult = db_query(host, daemon); + shutdown(); + } + return dbResult; + } + + @Override + public DbResult quiesce(String host, String daemon, Long tod) { + DbResult dbResult = new DbResult(); + if(init()) { + dbResult = db_quiesce(host, daemon, tod); + shutdown(); + } + return dbResult; + } + + @Override + public DbResult start(String host, String daemon, Long tod) { + DbResult dbResult = new DbResult(); + if(init()) { + dbResult = db_start(host, daemon, tod); + shutdown(); + } + return dbResult; + } + + @Override + public DbResult stop(String host, String daemon, Long tod) { + DbResult dbResult = new DbResult(); + if(init()) { + dbResult = db_stop(host, daemon, tod); + shutdown(); + } + return dbResult; + } + +} \ No newline at end of file Propchange: uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetime.java ------------------------------------------------------------------------------ svn:eol-style = native Added: uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetimeCommon.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetimeCommon.java?rev=1838082&view=auto ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetimeCommon.java (added) +++ uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetimeCommon.java Wed Aug 15 12:05:15 2018 @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ + +package org.apache.uima.ducc.database.lifetime; + +/* + * Class comprising common methods and data for this package. + */ +public class DbDaemonLifetimeCommon { + + public static final Integer RC_Failure = new Integer(-1); + public static final Integer RC_Success = new Integer(0); + public static final Integer RC_Help = new Integer(1); + + public static String normalize(String value) { + String retVal = value; + if(value != null) { + retVal = value.toLowerCase(); + } + return retVal; + } + + public static String normalize_kw(String kw) { + String retVal = normalize(kw); + return retVal; + } + + public static String normalize_daemon(String daemon) { + String retVal = normalize(daemon); + return retVal; + } + + public static String normalize_host(String host) { + String retVal = normalize(host); + if(host != null) { + retVal = host.split("\\.")[0]; + } + return retVal; + } +} Propchange: uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetimeCommon.java ------------------------------------------------------------------------------ svn:eol-style = native Added: uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetimeUI.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetimeUI.java?rev=1838082&view=auto ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetimeUI.java (added) +++ uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetimeUI.java Wed Aug 15 12:05:15 2018 @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ + +package org.apache.uima.ducc.database.lifetime; + +import java.util.List; + +import org.apache.log4j.Level; +import org.apache.uima.ducc.database.lifetime.IDbDaemonLifetime.Daemon; +import org.apache.uima.ducc.database.lifetime.IDbDaemonLifetime.DaemonInfo; +import org.apache.uima.ducc.database.lifetime.IDbDaemonLifetime.DbResult; + +/* + * Class comprising UI to manage table of DUCC daemons & desired state with respect to delete, query, quiesce, start, and stop. + */ +public class DbDaemonLifetimeUI { + + public static final Integer RC_Failure = DbDaemonLifetimeCommon.RC_Failure; + public static final Integer RC_Success = DbDaemonLifetimeCommon.RC_Success; + public static final Integer RC_Help = DbDaemonLifetimeCommon.RC_Help; + + private String __delete = "--delete"; + private String __query = "--query"; + private String __quiesce = "--quiesce"; + private String __start = "--start"; + private String __stop = "--stop"; + + private String[] kw_list = { __delete, __query, __quiesce, __start, __stop }; + private String[] daemon_list = { Daemon.Ag.lcname(), Daemon.Or.lcname(), Daemon.Pm.lcname(), Daemon.Rm.lcname(), Daemon.Sm.lcname(), Daemon.Ws.lcname(), Daemon.Br.lcname(), Daemon.Db.lcname() }; + + private String help_selector = "<"+__delete+"|"+__query+"|"+__quiesce+"|"+__start+"|"+__stop+">"; + private String help_host = "<host>"; + private String help_daemons = + "<" + +Daemon.Ag.lcname() + +"|" + +Daemon.Or.lcname() + +"|" + +Daemon.Pm.lcname() + +"|" + +Daemon.Rm.lcname() + +"|" + +Daemon.Sm.lcname() + +"|" + +Daemon.Ws.lcname() + +"|" + +Daemon.Br.lcname() + +"|" + +Daemon.Db.lcname() + + + ">" + ; + + private String message_specify = "specify one of " + +__delete + +", " + +__query + +", " + +__quiesce + +", " + +__start + +", " + +__stop + +""; + + private String help_msg = help_selector+" "+help_host+" "+help_daemons+"\n"; + + private DbDaemonLifetime dbDaemonLifetime = null; + + private DbDaemonLifetimeUI() { + Level level = Level.DEBUG; + dbDaemonLifetime = new DbDaemonLifetime(level); + } + + private void help() { + System.out.println(help_msg); + System.exit(RC_Help); + } + + private boolean is_valid_host(String arg) { + boolean retVal = false; + if(arg.length() > 0) { + retVal = true; + } + return retVal; + } + + private boolean is_valid(String needle, String[] haystack) { + boolean retVal = false; + if(needle != null) { + if(haystack != null) { + for(String item : haystack) { + if(needle.equalsIgnoreCase(item)) { + retVal = true; + break; + } + } + } + } + return retVal; + } + + private boolean is_valid_kw(String arg) { + return is_valid(arg,kw_list); + } + + private boolean is_valid_daemon(String arg) { + return is_valid(arg,daemon_list); + } + + private int delete(String host, String daemon) { + DbResult dbResult = dbDaemonLifetime.delete(host,daemon); + return dbResult.rc; + } + + private int query(String host, String daemon) { + DbResult dbResult = dbDaemonLifetime.query(host,daemon); + List<DaemonInfo> list = dbResult.list; + if(list != null) { + for(DaemonInfo di : list) { + System.out.println(di.host+"."+di.daemon+"="+di.state); + } + } + return dbResult.rc; + } + + private int quiesce(String host, String daemon, Long tod) { + DbResult dbResult = dbDaemonLifetime.quiesce(host,daemon,tod); + return dbResult.rc; + } + + private int start(String host, String daemon, Long tod) { + DbResult dbResult = dbDaemonLifetime.start(host,daemon,tod); + return dbResult.rc; + } + + private int stop(String host, String daemon, Long tod) { + DbResult dbResult; + if((host == null) || (daemon == null)) { + dbResult = dbDaemonLifetime.query(host,daemon); + if(dbResult.rc == RC_Success) { + List<DaemonInfo> list = dbResult.list; + if(list != null) { + for(DaemonInfo di : list) { + stop(di.host, di.daemon, tod); + } + } + } + } + else { + dbResult = dbDaemonLifetime.stop(host,daemon,tod); + } + return dbResult.rc; + } + + private int mainline(String[] args) { + int rc = RC_Failure; + Long tod = System.currentTimeMillis(); + String kw = null; + if(args.length > 0) { + kw = DbDaemonLifetimeCommon.normalize_kw(args[0]); + } + String host = null; + if(args.length > 1) { + host = DbDaemonLifetimeCommon.normalize_host(args[1]); + } + String daemon = null; + if(args.length > 2) { + daemon = DbDaemonLifetimeCommon.normalize_daemon(args[2]); + } + // + if(args.length < 1) { + String message = "no arguments specified"; + System.out.println(message); + help(); + } + // + if(args.length == 1) { + if(!is_valid_kw(kw)) { + System.out.println(message_specify); + help(); + } + if(kw.equalsIgnoreCase(__query)) { + rc = query(host,daemon); + } + else if(kw.equalsIgnoreCase(__stop)) { + rc = stop(host,daemon,tod); + } + else { + String message = "specify host and daemon"; + System.out.println(message); + help(); + } + } + // + else if(args.length == 2) { + if(!is_valid_kw(kw)) { + System.out.println(message_specify); + help(); + } + if(!is_valid_host(host)) { + help(); + } + if(kw.equalsIgnoreCase(__query)) { + rc = query(host,daemon); + } + else if(kw.equalsIgnoreCase(__stop)) { + rc = stop(host,daemon,tod); + } + else { + String message = "specify host and daemon"; + System.out.println(message); + help(); + } + } + // + else if(args.length == 3) { + if(!is_valid_kw(kw)) { + System.out.println(message_specify); + help(); + } + if(!is_valid_host(host)) { + help(); + } + if(!is_valid_daemon(daemon)) { + help(); + } + if(kw.equalsIgnoreCase(__query)) { + rc = query(host,daemon); + } + else if(kw.equalsIgnoreCase(__delete)) { + rc = delete(host,daemon); + } + else if(kw.equalsIgnoreCase(__quiesce)) { + rc = quiesce(host,daemon,tod); + } + else if(kw.equalsIgnoreCase(__start)) { + rc = start(host,daemon,tod); + } + else if(kw.equalsIgnoreCase(__stop)) { + rc = stop(host,daemon,tod); + } + else { + System.out.println(message_specify); + help(); + } + } + // + else { + String message = "too many arguments"; + System.out.println(message); + help(); + } + return rc; + } + + public static void main(String[] args) { + int rc = RC_Failure; + try { + DbDaemonLifetimeUI instance = new DbDaemonLifetimeUI(); + rc = instance.mainline(args); + } + catch(Exception e) { + e.printStackTrace(); + } + System.exit(rc); + } +} Propchange: uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/DbDaemonLifetimeUI.java ------------------------------------------------------------------------------ svn:eol-style = native Added: uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/IDbDaemonLifetime.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/IDbDaemonLifetime.java?rev=1838082&view=auto ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/IDbDaemonLifetime.java (added) +++ uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/IDbDaemonLifetime.java Wed Aug 15 12:05:15 2018 @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ +package org.apache.uima.ducc.database.lifetime; + +import java.util.List; + +import org.apache.uima.ducc.common.persistence.IDbProperty; + +/* + * Interface to manage table of DUCC daemons & desired state. + */ +public interface IDbDaemonLifetime { + + public enum DaemonLifetimeProperties implements IDbProperty { + TABLE_NAME { + public String pname() { + return "daemonLifetime"; + } + + public Type type() { + return Type.String; + } + }, + // The order of the primary keys is important here as the Db assigns + // semantics to the first key in a compound PK + host { + public boolean isPrimaryKey() { + return true; + } + }, + daemon { + public boolean isPrimaryKey() { + return true; + } + }, + state { + + }, + tod { + public Type type() { + return Type.Long; + } + }, + ; + + public String pname() { + return name(); + } + + public Type type() { + return Type.String; + } + + public boolean isPrimaryKey() { + return false; + } + + public boolean isPrivate() { + return false; + } + + public boolean isMeta() { + return false; + } + + public boolean isIndex() { + return false; + } + + public String columnName() { + return pname(); + } + } + + public enum Daemon { + Ag, + Or, + Pm, + Rm, + Sm, + Ws, + Br, + Db, + ; + public String lcname() { + return name().toLowerCase(); + } + } + + public enum DesiredState { + Start, + Stop, + Quiesce, + } + + public class DaemonInfo { + public String host = null; + public String daemon = null; + public String state = null; + public Long tod = null; + public DaemonInfo(String host, String daemon, String state, Long tod) { + this.host = host; + this.daemon = daemon; + this.state = state; + this.tod = tod; + } + } + + public class DbResult { + public static final Integer RC_Failure = DbDaemonLifetimeCommon.RC_Failure; + public int rc = RC_Failure; + public List<DaemonInfo> list = null; + } + + public DbResult delete(String host, String daemon); + public DbResult query(String host, String daemon); + public DbResult quiesce(String host, String daemon, Long tod); + public DbResult start(String host, String daemon, Long tod); + public DbResult stop(String host, String daemon, Long tod); + +} Propchange: uima/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/lifetime/IDbDaemonLifetime.java ------------------------------------------------------------------------------ svn:eol-style = native