Author: challngr Date: Fri Dec 11 14:18:06 2015 New Revision: 1719426 URL: http://svn.apache.org/viewvc?rev=1719426&view=rev Log: UIMA-4577 Redo rm_qoccupancy using cqlsh because it's so much faster than spawning java!
Modified: uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy Modified: uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy?rev=1719426&r1=1719425&r2=1719426&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy (original) +++ uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy Fri Dec 11 14:18:06 2015 @@ -37,12 +37,12 @@ class DuccRmQOccupancy(DuccUtil): status = 'up' else: status = 'down' - print "%20s %11s %6s %6s %15s %10s %3s(Q) %6s %6s %8s %7s %10s %8s" % (n['name'], n['blacklisted'], n['online'], status, n['nodepool'], + print "%20s %11s %6s %6s %15s %10s %3s(Q) %6s %6s %8s %7s %10s %-8s" % (n['name'], n['blacklisted'], n['online'], status, n['nodepool'], n['memory'], n['share_order'], n['shares_left'], n['assignments'], n['np_assignments'], n['quantum'], n['reservable'], n['classes']) if ( shares.has_key(n['name']) ): for s in shares[n['name']]: - fmt = '%19s ' + s['jobtype'] +'[%8d] S[%8d] O[%d] II[%8d] IR[%8d] E[%5s] P[%5s] F[%5s] S[%10s]' + fmt = '%19s ' + s['jobtype'] +'[%8s] S[%8s] O[%s] II[%8s] IR[%8s] E[%5s] P[%5s] F[%5s] S[%10s]' state = s['state'] if ( state == 'null' ): state = "Assigned" @@ -50,13 +50,66 @@ class DuccRmQOccupancy(DuccUtil): print '' - - # Given DUCC_HOME, a directory, and part of the name of a jar, find the actual name of the jar which will - # likeley be versioned - def resolve_jar(self, DH, dirname, basename): - partial = DH + '/' + dirname + '/'+ basename + '*' - ret = glob.glob(partial) - return ret[0] + + def parse_header(self, header): + ret = [] + parts = header.split('|') + for p in parts: + ret.append(p.strip()) + return ret + + def parse_node(self, header, line): + parts = line.split('|') + ret = {} + for k, v in zip(header, parts): + ret[k] = v.strip() + return ret + + def parse_share(self, header, line): + parts = line.split('|') + ret = {} + for k, v in zip(header, parts): + ret[k] = v.strip() + return ret + + + def rmnodes(self, lines): + nodes = [] + shares = {} + header = [] + for l in lines: + l = l.strip() + # print '[]', l + if ( l == '' ): + continue + if ( '---' in l ): + continue; + if ( 'rows)' in l ): + continue; + if ( 'assignments' in l ): + doing_nodes = True + doing_shares = False + header = self.parse_header(l) + continue + if ( 'investment' in l ): + doing_nodes = False + doing_shares = True + header = self.parse_header(l) + continue + if ( doing_nodes ): + nodes.append(self.parse_node(header, l)) + continue + if ( doing_shares ): + s = self.parse_share(header, l) + k = s['node'] + if ( shares.has_key(k) ): + share_list = shares[k] + else: + share_list = [] + shares[k] = share_list + share_list.append(s) + continue + return nodes, shares def main(self, argv): @@ -65,53 +118,20 @@ class DuccRmQOccupancy(DuccUtil): sys.exit(1); DH = self.DUCC_HOME - CP = [self.resolve_jar(DH, '/lib/uima-ducc', 'uima-ducc-database'), - self.resolve_jar(DH, '/lib/uima-ducc', 'uima-ducc-common'), - DH + '/lib/cassandra/*', - DH + '/lib/apache-log4j/*', - DH + '/lib/guava/*', - self.resolve_jar(DH, '/cassandra-server/lib', 'slf4j-api'), - self.resolve_jar(DH, '/apache-uima/apache-activemq/lib', 'slf4j-log4j12'), - ] - os.environ['CLASSPATH'] = ':'.join(CP) - - DUCC_JVM_OPTS = '' - DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -DDUCC_HOME=' + self.DUCC_HOME - DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.rm.persistence.impl=org.apache.uima.ducc.database.RmStatePersistence' - dbn = self.ducc_properties.get('ducc.database.host') - CMD = [self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.database.RmShareState', dbn] - CMD = ' '.join(CMD) - lines = '' - proc = subprocess.Popen(CMD, bufsize=0, stdout=subprocess.PIPE, shell=True) - for line in proc.stdout: - lines = lines + line - shares = eval(lines) - - shares_by_machine = {} - for share in shares: - k = share['node'] - if ( shares_by_machine.has_key(k) ): - share_list = shares_by_machine[k] - else: - share_list = [] - shares_by_machine[k] = share_list - share_list.append(share) - - CMD = [self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.database.RmNodeState', dbn] + CMD = [DH + '/cassandra-server/bin/cqlsh', dbn, '-u', 'guest', '-p', 'guest', '-e', '"select * from ducc.rmnodes; select * from ducc.rmshares;"'] CMD = ' '.join(CMD) - lines = '' + + lines = [] proc = subprocess.Popen(CMD, bufsize=0, stdout=subprocess.PIPE, shell=True) for line in proc.stdout: - lines = lines + line + # print line.strip() + lines.append(line) + + nodes, shares = self.rmnodes(lines) + self.format(nodes, shares) - nodes = eval(lines) - nodes = sorted(nodes, key=lambda n: n["name"]) - - self.format(nodes, shares_by_machine) - - return if __name__ == "__main__": stopper = DuccRmQOccupancy()