This is an automated email from the git hooks/post-receive script. afif pushed a commit to branch master in repository falcon.
commit 58a5f3293b83f7d435c8cd3a39912134455746a6 Author: Afif Elghraoui <[email protected]> Date: Sat Nov 26 20:06:30 2016 -0800 Imported Upstream version 1.8.3 --- FALCON-examples/git-sym.makefile | 2 + FALCON-examples/makefile | 6 +- FALCON-examples/run/greg200k-sv2/README.md | 1 + FALCON-examples/run/greg200k-sv2/fc_run.cfg | 44 ++ FALCON-examples/run/greg200k-sv2/fc_unzip.cfg | 21 + FALCON-examples/run/greg200k-sv2/input.fofn | 2 + FALCON-examples/run/greg200k-sv2/makefile | 10 + FALCON-examples/run/synth0/makefile | 8 - FALCON-make/makefile | 2 +- FALCON/falcon_kit/bash.py | 4 +- FALCON/falcon_kit/mains/fetch_reads.py | 72 +-- FALCON/falcon_kit/mains/get_read_ctg_map.py | 180 +++--- FALCON/falcon_kit/mains/pr_ctg_track.py | 6 +- FALCON/falcon_kit/mains/rr_ctg_track.py | 10 +- FALCON/falcon_kit/mains/run1.py | 10 +- FALCON/test/test_functional.py | 1 - pypeFLOW/pypeflow/common.py | 149 ----- pypeFLOW/pypeflow/controller.py | 875 ------------------------- pypeFLOW/pypeflow/data.py | 315 --------- pypeFLOW/pypeflow/pwatcher_bridge.py | 391 ----------- pypeFLOW/pypeflow/pwatcher_workflow.py | 9 + pypeFLOW/pypeflow/simple_pwatcher_bridge.py | 14 +- pypeFLOW/pypeflow/task.py | 892 -------------------------- pypeFLOW/setup.py | 9 +- 24 files changed, 247 insertions(+), 2786 deletions(-) diff --git a/FALCON-examples/git-sym.makefile b/FALCON-examples/git-sym.makefile index aecdd97..ed9b703 100644 --- a/FALCON-examples/git-sym.makefile +++ b/FALCON-examples/git-sym.makefile @@ -27,3 +27,5 @@ synth5k.2016-11-02: curl -L https://downloads.pacbcloud.com/public/data/git-sym/synth5k.2016-11-02.tgz | tar xvfz - ecoli.m140913_050931_42139_c100713652400000001823152404301535_s1_p0: curl -L https://downloads.pacbcloud.com/public/data/git-sym/ecoli.m140913_050931_42139_c100713652400000001823152404301535_s1_p0.subreads.tar | tar xvf - +greg200k-sv2: + curl -L https://downloads.pacbcloud.com/public/data/git-sym/greg200k-sv2.tar | tar xvf - diff --git a/FALCON-examples/makefile b/FALCON-examples/makefile index 57a7459..d600c35 100644 --- a/FALCON-examples/makefile +++ b/FALCON-examples/makefile @@ -8,13 +8,9 @@ setup-%: git-sym check run/$* # Our only integration test, for now. test: - python -c 'import pypeflow.common; print pypeflow.common' + python -c 'import pypeflow.pwatcher_workflow; print pypeflow.pwatcher_workflow' python -c 'import falcon_kit; print falcon_kit.falcon' ${MAKE} run-synth0 ${MAKE} -C run/synth0 test - #${MAKE} -C run/synth0 clean - #${MAKE} -C run/synth0 go0 # still test the old pypeflow too, for now - ${MAKE} -C run/synth0 clean - ${MAKE} -C run/synth0 go1 # should be the same as go .PHONY: default diff --git a/FALCON-examples/run/greg200k-sv2/README.md b/FALCON-examples/run/greg200k-sv2/README.md new file mode 100644 index 0000000..85c3a80 --- /dev/null +++ b/FALCON-examples/run/greg200k-sv2/README.md @@ -0,0 +1 @@ +This needs to be updated again. Still a WIP. diff --git a/FALCON-examples/run/greg200k-sv2/fc_run.cfg b/FALCON-examples/run/greg200k-sv2/fc_run.cfg new file mode 100755 index 0000000..a8fb8c4 --- /dev/null +++ b/FALCON-examples/run/greg200k-sv2/fc_run.cfg @@ -0,0 +1,44 @@ +[General] +# list of files of the initial bas.h5 files +input_fofn = input.fofn +#input_fofn = preads.fofn + +job_type = local + +input_type = raw +#input_type = preads + +#openending = True + +# The length cutoff used for seed reads used for initial mapping +length_cutoff = 1000 +genome_size = 200000 +#seed_coverage = 60 + +# The length cutoff used for seed reads usef for pre-assembly +length_cutoff_pr = 1 + + +sge_option_da = -pe smp 4 -q bigmem +sge_option_la = -pe smp 20 -q bigmem +sge_option_pda = -pe smp 6 -q bigmem +sge_option_pla = -pe smp 16 -q bigmem +sge_option_fc = -pe smp 24 -q bigmem +sge_option_cns = -pe smp 8 -q bigmem + +#192 ? +pa_concurrent_jobs = 12 +cns_concurrent_jobs = 12 +ovlp_concurrent_jobs = 12 + +pa_HPCdaligner_option = -v -dal128 -t16 -e0.8 -M24 -l3200 -k18 -h480 -w8 -s100 +ovlp_HPCdaligner_option = -v -dal128 -M24 -k24 -h1024 -e.9 -l2500 -s100 + +pa_DBsplit_option = -a -x500 -s100 +ovlp_DBsplit_option = -s100 + +falcon_sense_option = --output_multi --min_cov_aln 4 --min_idt 0.70 --min_cov 4 --max_n_read 200 --n_core 8 +falcon_sense_skip_contained = False + +overlap_filtering_setting = --max_diff 120 --max_cov 120 --min_cov 2 --n_core 12 +#dazcon = 1 diff --git a/FALCON-examples/run/greg200k-sv2/fc_unzip.cfg b/FALCON-examples/run/greg200k-sv2/fc_unzip.cfg new file mode 100644 index 0000000..c9de3af --- /dev/null +++ b/FALCON-examples/run/greg200k-sv2/fc_unzip.cfg @@ -0,0 +1,21 @@ +[General] +job_type = SGE +job_type = local + +[Unzip] + +input_fofn= input.fofn +input_bam_fofn= input_bam.fofn +#smrt_bin= /mnt/secondary/builds/full/3.0.0/prod/smrtanalysis_3.0.0.153854/smrtcmds/bin/ +#smrt_bin=/mnt/secondary/builds/full/3.0.1/prod/current-build_smrtanalysis/smrtcmds/bin/ +smrt_bin=/mnt/secondary/builds/full/3.0.0/prod/current-build_smrtanalysis/smrtcmds/bin/ +sge_phasing= -pe smp 12 -q bigmem +sge_quiver= -pe smp 12 -q sequel-farm +sge_track_reads= -pe smp 12 -q default +sge_blasr_aln= -pe smp 24 -q bigmem +sge_hasm= -pe smp 48 -q bigmem +unzip_concurrent_jobs = 64 +quiver_concurrent_jobs = 64 + +unzip_concurrent_jobs = 12 +quiver_concurrent_jobs = 12 diff --git a/FALCON-examples/run/greg200k-sv2/input.fofn b/FALCON-examples/run/greg200k-sv2/input.fofn new file mode 100644 index 0000000..99cce2d --- /dev/null +++ b/FALCON-examples/run/greg200k-sv2/input.fofn @@ -0,0 +1,2 @@ +data/greg200k-sv2/subreads1.dexta +data/greg200k-sv2/subreads2.dexta diff --git a/FALCON-examples/run/greg200k-sv2/makefile b/FALCON-examples/run/greg200k-sv2/makefile new file mode 100644 index 0000000..099e5ae --- /dev/null +++ b/FALCON-examples/run/greg200k-sv2/makefile @@ -0,0 +1,10 @@ +falcon: + fc_run.py fc_run.cfg +unzip: + fc_unzip.py fc_unzip.cfg +quiver: + fc_quiver.py fc_unzip.cfg +clean: + rm -rf 3-*/ mypwatcher/ pwatcher.dir +cleaner: clean + rm -rf 0-*/ 1-*/ 2-*/ all.log mypwatcher/ scripts/ sge_log/ diff --git a/FALCON-examples/run/synth0/makefile b/FALCON-examples/run/synth0/makefile index d471427..e7ef88e 100644 --- a/FALCON-examples/run/synth0/makefile +++ b/FALCON-examples/run/synth0/makefile @@ -5,14 +5,6 @@ go: run ${MAKE} test run: fc_run fc_run.cfg logging.json -go1: run1 - ${MAKE} test -run1: - fc_run1 fc_run.cfg logging.ini -go0: run0 - ${MAKE} test -run0: - fc_run0 fc_run.cfg logging.ini test: ./check.py clean: diff --git a/FALCON-make/makefile b/FALCON-make/makefile index f6c0d95..52b42d3 100644 --- a/FALCON-make/makefile +++ b/FALCON-make/makefile @@ -53,7 +53,7 @@ show: echo "FALCON_PIP_EDIT=${FALCON_PIP_EDIT}" echo "FALCON_PIP_USER=${FALCON_PIP_USER}" check: - python -c 'import pypeflow.common; print pypeflow.common' + python -c 'import pypeflow.simple_pwatcher_bridge; print pypeflow.simple_pwatcher_bridge' python -c 'import falcon_kit; print falcon_kit.falcon' extra: pip install ${FALCON_PIP_USER} Cython diff --git a/FALCON/falcon_kit/bash.py b/FALCON/falcon_kit/bash.py index fabe142..dbf3378 100644 --- a/FALCON/falcon_kit/bash.py +++ b/FALCON/falcon_kit/bash.py @@ -262,8 +262,8 @@ ln -sf ${{db_dir}}/{db_prefix}.db . ln -sf ${{db_dir}}/.{db_prefix}.dust.anno . ln -sf ${{db_dir}}/.{db_prefix}.dust.data . {daligner_cmd} -rm -f *.C?.las *.C?.S.las -rm -f *.N?.las *.N?.S.las +rm -f *.C?.las *.C?.S.las *.C??.las *.C??.S.las *.C???.las *.C???.S.las +rm -f *.N?.las *.N?.S.las *.N??.las *.N??.S.las *.N???.las *.N???.S.las """.format(db_dir=db_dir, db_prefix=db_prefix, daligner_cmd=daligner_cmd) yield job_uid, bash diff --git a/FALCON/falcon_kit/mains/fetch_reads.py b/FALCON/falcon_kit/mains/fetch_reads.py index c4576c9..9f7b458 100644 --- a/FALCON/falcon_kit/mains/fetch_reads.py +++ b/FALCON/falcon_kit/mains/fetch_reads.py @@ -6,45 +6,41 @@ import sys import re - - def fetch_ref_and_reads(base_dir, fofn, ctg_id, out_dir, min_ctg_lenth): - - read_fofn = fofn if out_dir == None: - out_dir = os.path.join( base_dir, "3-unzip/reads") + out_dir = os.path.join(base_dir, '3-unzip/reads') - ctg_fa = os.path.join( base_dir, "2-asm-falcon/p_ctg.fa") - read_map_dir = os.path.join( base_dir, "2-asm-falcon/read_maps" ) + ctg_fa = os.path.join(base_dir, '2-asm-falcon/p_ctg.fa') + read_map_dir = os.path.join(base_dir, '2-asm-falcon/read_maps') - rawread_id_file = os.path.join( read_map_dir, "raw_read_ids" ) - pread_id_file = os.path.join( read_map_dir, "pread_ids" ) + rawread_id_file = os.path.join(read_map_dir, 'dump_rawread_ids', 'rawread_ids') + pread_id_file = os.path.join(read_map_dir, 'dump_pread_ids', 'pread_ids') - rid_to_oid = open(rawread_id_file).read().split("\n") #daligner raw read id to the original ids - pid_to_fid = open(pread_id_file).read().split("\n") #daligner pread id to the fake ids + rid_to_oid = open(rawread_id_file).read().split('\n') #daligner raw read id to the original ids + pid_to_fid = open(pread_id_file).read().split('\n') #daligner pread id to the fake ids def pid_to_oid(pid): fid = pid_to_fid[int(pid)] - rid = int(fid.split("/")[1])/10 + rid = int(fid.split('/')[1])/10 return rid_to_oid[int(rid)] ref_fasta = FastaReader(ctg_fa) all_ctg_ids = set() for s in ref_fasta: s_id = s.name.split()[0] - if ctg_id != "all" and s_id != ctg_id: + if ctg_id != 'all' and s_id != ctg_id: continue if len(s.sequence) < min_ctg_lenth: continue - if ctg_id != "all": - ref_out = open( os.path.join( out_dir, "%s_ref.fa" % ctg_id), "w" ) + if ctg_id != 'all': + ref_out = open( os.path.join( out_dir, '%s_ref.fa' % ctg_id), 'w' ) else: - ref_out = open( os.path.join( out_dir, "%s_ref.fa" % s_id), "w" ) + ref_out = open( os.path.join( out_dir, '%s_ref.fa' % s_id), 'w' ) - print >>ref_out, ">%s" % s_id + print >>ref_out, '>%s' % s_id print >>ref_out, s.sequence all_ctg_ids.add(s_id) ref_out.close() @@ -53,71 +49,71 @@ def fetch_ref_and_reads(base_dir, fofn, ctg_id, out_dir, min_ctg_lenth): read_set = {} ctg_id_hits = {} - map_fn = os.path.join(read_map_dir,"rawread_to_contigs") - with open(map_fn, "r") as f: + map_fn = os.path.join(read_map_dir,'rawread_to_contigs') + with open(map_fn, 'r') as f: for row in f: row = row.strip().split() hit_ctg = row[1] - hit_ctg = hit_ctg.split("-")[0] + hit_ctg = hit_ctg.split('-')[0] if int(row[3]) == 0: o_id = rid_to_oid[int(row[0])] read_set[o_id] = hit_ctg ctg_id_hits[hit_ctg] = ctg_id_hits.get(hit_ctg, 0) + 1 - map_fn = os.path.join(read_map_dir,"pread_to_contigs") - with open(map_fn, "r") as f: + map_fn = os.path.join(read_map_dir,'pread_to_contigs') + with open(map_fn, 'r') as f: for row in f: row = row.strip().split() hit_ctg = row[1] - hit_ctg = hit_ctg.split("-")[0] + hit_ctg = hit_ctg.split('-')[0] if hit_ctg not in read_set and int(row[3]) == 0: o_id = pid_to_oid(row[0]) read_set[o_id] = hit_ctg ctg_id_hits[hit_ctg] = ctg_id_hits.get(hit_ctg, 0) + 1 - with open(os.path.join( out_dir, "ctg_list"),"w") as f: + with open(os.path.join( out_dir, 'ctg_list'),'w') as f: for ctg_id in sorted(list(all_ctg_ids)): if ctg_id_hits.get(ctg_id, 0) < 5: continue - if ctg_id[-1] not in ["F", "R"]: #ignore small circle contigs, they need different approach + if ctg_id[-1] not in ['F', 'R']: #ignore small circle contigs, they need different approach continue print >>f, ctg_id read_out_files = {} - with open(read_fofn, "r") as f: + with open(read_fofn, 'r') as f: for r_fn in f: r_fn = r_fn.strip() read_fa_file = FastaReader(r_fn) for r in read_fa_file: rid = r.name.split()[0] if rid not in read_set: - ctg_id = "unassigned" + ctg_id = 'unassigned' else: ctg_id = read_set[rid] - if ctg_id == "NA" or ctg_id not in all_ctg_ids: - ctg_id = "unassigned" + if ctg_id == 'NA' or ctg_id not in all_ctg_ids: + ctg_id = 'unassigned' if ctg_id not in read_out_files: - read_out = open( os.path.join( out_dir, "%s_reads.fa" % ctg_id), "w" ) + read_out = open( os.path.join( out_dir, '%s_reads.fa' % ctg_id), 'w' ) read_out_files[ctg_id] = 1 else: - read_out = open( os.path.join( out_dir, "%s_reads.fa" % ctg_id), "a" ) + read_out = open( os.path.join( out_dir, '%s_reads.fa' % ctg_id), 'a' ) - print >>read_out, ">"+rid + print >>read_out, '>'+rid print >>read_out, r.sequence read_out.close() def parse_args(argv): parser = argparse.ArgumentParser(description='using the read to contig mapping data to partition the reads grouped by contigs') - parser.add_argument('--base_dir', type=str, default="./", help='the base working dir of a falcon assembly') - parser.add_argument('--fofn', type=str, default="./input.fofn", help='path to the file of the list of raw read fasta files') - parser.add_argument('--ctg_id', type=str, default="all", help='contig identifier in the contig fasta file') + parser.add_argument('--base_dir', type=str, default='./', help='the base working dir of a falcon assembly') + parser.add_argument('--fofn', type=str, default='./input.fofn', help='path to the file of the list of raw read fasta files') + parser.add_argument('--ctg_id', type=str, default='all', help='contig identifier in the contig fasta file') parser.add_argument('--out_dir', default=None, type=str, help='the output base_dir, default to `base_dir/3-unzip/reads` directory') parser.add_argument('--min_ctg_lenth', default=20000, type=int, help='the minimum length of the contig for the outputs, default=20000') - #parser.add_argument('--ctg_fa', type=str, default="./2-asm-falcon/p_ctg.fa", help='path to the contig fasta file') - #parser.add_argument('--read_map_dir', type=str, default="./2-asm-falcon/read_maps", help='path to the read-contig map directory') + #parser.add_argument('--ctg_fa', type=str, default='./2-asm-falcon/p_ctg.fa', help='path to the contig fasta file') + #parser.add_argument('--read_map_dir', type=str, default='./2-asm-falcon/read_maps', help='path to the read-contig map directory') # we can run this in parallel mode in the furture #parser.add_argument('--n_core', type=int, default=4, # help='number of processes used for generating consensus') @@ -129,5 +125,5 @@ def main(argv=sys.argv): args = parse_args(argv) fetch_ref_and_reads(**vars(args)) -if __name__ == "__main__": +if __name__ == '__main__': main() diff --git a/FALCON/falcon_kit/mains/get_read_ctg_map.py b/FALCON/falcon_kit/mains/get_read_ctg_map.py index 7b85f95..349322f 100644 --- a/FALCON/falcon_kit/mains/get_read_ctg_map.py +++ b/FALCON/falcon_kit/mains/get_read_ctg_map.py @@ -1,10 +1,14 @@ -from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn -from pypeflow.task import PypeTask, PypeThreadTaskBase, PypeTaskBase -from pypeflow.controller import PypeWorkflow, PypeMPWorkflow, PypeThreadWorkflow -from falcon_kit.FastaReader import FastaReader +from __future__ import absolute_import +#from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn +#from pypeflow.task import PypeTask, PypeThreadTaskBase, PypeTaskBase +#from pypeflow.controller import PypeWorkflow, PypeMPWorkflow, PypeThreadWorkflow +from pypeflow.simple_pwatcher_bridge import (PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase, + makePypeLocalFile, fn, PypeTask) +PypeThreadTaskBase = MyFakePypeThreadTaskBase from falcon_kit.fc_asm_graph import AsmGraph import argparse import glob +import logging import sys import subprocess as sp import shlex @@ -14,97 +18,106 @@ def make_dirs(d): if not os.path.isdir(d): os.makedirs(d) +def dump_rawread_ids(self): + rawread_db = fn(self.rawread_db) + rawread_id_file = fn(self.rawread_id_file) + os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (rawread_db, rawread_id_file)) -def get_read_ctg_map(rawread_dir, pread_dir, asm_dir): - - read_map_dir = os.path.abspath(os.path.join(asm_dir, "read_maps")) - make_dirs(read_map_dir) +def dump_pread_ids(self): + pread_db = fn(self.pread_db) + pread_id_file = fn(self.pread_id_file) + os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (pread_db, pread_id_file)) - PypeMPWorkflow.setNumThreadAllowed(12, 12) - wf = PypeMPWorkflow() +def generate_read_to_ctg_map(self): + rawread_id_file = fn(self.rawread_id_file) + pread_id_file = fn(self.pread_id_file) + read_to_contig_map = fn(self.read_to_contig_map) - rawread_db = makePypeLocalFile( os.path.join( rawread_dir, "raw_reads.db" ) ) - rawread_id_file = makePypeLocalFile( os.path.join( read_map_dir, "raw_read_ids" ) ) + pread_did_to_rid = open(pread_id_file).read().split('\n') + rid_to_oid = open(rawread_id_file).read().split('\n') - @PypeTask( inputs = {"rawread_db": rawread_db}, - outputs = {"rawread_id_file": rawread_id_file}, - TaskType = PypeThreadTaskBase, - URL = "task://localhost/dump_rawread_ids" ) - def dump_rawread_ids(self): - rawread_db = fn( self.rawread_db ) - rawread_id_file = fn( self.rawread_id_file ) - os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (rawread_db, rawread_id_file) ) + asm_G = AsmGraph(fn(self.sg_edges_list), + fn(self.utg_data), + fn(self.ctg_paths)) - wf.addTask( dump_rawread_ids ) - - pread_db = makePypeLocalFile( os.path.join( pread_dir, "preads.db" ) ) - pread_id_file = makePypeLocalFile( os.path.join( read_map_dir, "pread_ids" ) ) - - @PypeTask( inputs = {"pread_db": pread_db}, - outputs = {"pread_id_file": pread_id_file}, - TaskType = PypeThreadTaskBase, - URL = "task://localhost/dump_pread_ids" ) - def dump_pread_ids(self): - pread_db = fn( self.pread_db ) - pread_id_file = fn( self.pread_id_file ) - os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (pread_db, pread_id_file) ) + pread_to_contigs = {} - wf.addTask( dump_pread_ids ) + with open(read_to_contig_map, 'w') as f: + for ctg in asm_G.ctg_data: + if ctg[-1] == 'R': + continue + ctg_g = asm_G.get_sg_for_ctg(ctg) + for n in ctg_g.nodes(): + pid = int(n.split(':')[0]) - wf.refreshTargets() # block + rid = pread_did_to_rid[pid].split('/')[1] + rid = int(int(rid)/10) + oid = rid_to_oid[rid] + k = (pid, rid, oid) + pread_to_contigs.setdefault(k, set()) + pread_to_contigs[k].add(ctg) - sg_edges_list = makePypeLocalFile( os.path.join(asm_dir, "sg_edges_list") ) - utg_data = makePypeLocalFile( os.path.join(asm_dir, "utg_data") ) - ctg_paths = makePypeLocalFile( os.path.join(asm_dir, "ctg_paths") ) - inputs = { "rawread_id_file": rawread_id_file, - "pread_id_file": pread_id_file, - "sg_edges_list": sg_edges_list, - "utg_data": utg_data, - "ctg_paths": ctg_paths } + for k in pread_to_contigs: + pid, rid, oid = k + for ctg in list(pread_to_contigs[ k ]): + print >>f, '%09d %09d %s %s' % (pid, rid, oid, ctg) - read_to_contig_map = makePypeLocalFile( os.path.join(read_map_dir, "read_to_contig_map") ) +def get_read_ctg_map(rawread_dir, pread_dir, asm_dir): + read_map_dir = os.path.abspath(os.path.join(asm_dir, 'read_maps')) + make_dirs(read_map_dir) - @PypeTask( inputs = inputs, - outputs = {"read_to_contig_map": read_to_contig_map}, + wf = PypeProcWatcherWorkflow( + max_jobs=12, + ) + """ + job_type=config['job_type'], + job_queue=config['job_queue'], + sge_option=config.get('sge_option', ''), + watcher_type=config['pwatcher_type'], + watcher_directory=config['pwatcher_directory']) + """ + + rawread_db = makePypeLocalFile(os.path.join(rawread_dir, 'raw_reads.db')) + rawread_id_file = makePypeLocalFile(os.path.join(read_map_dir, 'dump_rawread_ids', 'rawread_ids')) + + task = PypeTask( + inputs = {'rawread_db': rawread_db}, + outputs = {'rawread_id_file': rawread_id_file}, + TaskType = PypeThreadTaskBase, + URL = 'task://localhost/dump_rawread_ids') + wf.addTask(task(dump_rawread_ids)) + + pread_db = makePypeLocalFile(os.path.join(pread_dir, 'preads.db')) + pread_id_file = makePypeLocalFile(os.path.join(read_map_dir, 'dump_pread_ids', 'pread_ids')) + + task = PypeTask( + inputs = {'pread_db': pread_db}, + outputs = {'pread_id_file': pread_id_file}, TaskType = PypeThreadTaskBase, - URL = "task://localhost/get_ctg_read_map" ) - def generate_read_to_ctg_map(self): - rawread_id_file = fn( self.rawread_id_file ) - pread_id_file = fn( self.pread_id_file ) - read_to_contig_map = fn( self.read_to_contig_map ) - - pread_did_to_rid = open(pread_id_file).read().split("\n") - rid_to_oid = open(rawread_id_file).read().split("\n") + URL = 'task://localhost/dump_pread_ids' ) + wf.addTask(task(dump_pread_ids)) - asm_G = AsmGraph(fn(self.sg_edges_list), - fn(self.utg_data), - fn(self.ctg_paths) ) - - pread_to_contigs = {} - - with open(read_to_contig_map, "w") as f: - for ctg in asm_G.ctg_data: - if ctg[-1] == "R": - continue - ctg_g = asm_G.get_sg_for_ctg(ctg) - for n in ctg_g.nodes(): - pid = int(n.split(":")[0]) + wf.refreshTargets() # block - rid = pread_did_to_rid[pid].split("/")[1] - rid = int(int(rid)/10) - oid = rid_to_oid[rid] - k = (pid, rid, oid) - pread_to_contigs.setdefault( k, set() ) - pread_to_contigs[ k ].add( ctg ) + sg_edges_list = makePypeLocalFile( os.path.join(asm_dir, 'sg_edges_list') ) + utg_data = makePypeLocalFile( os.path.join(asm_dir, 'utg_data') ) + ctg_paths = makePypeLocalFile( os.path.join(asm_dir, 'ctg_paths') ) + inputs = { 'rawread_id_file': rawread_id_file, + 'pread_id_file': pread_id_file, + 'sg_edges_list': sg_edges_list, + 'utg_data': utg_data, + 'ctg_paths': ctg_paths } - for k in pread_to_contigs: - pid, rid, oid = k - for ctg in list(pread_to_contigs[ k ]): - print >>f, "%09d %09d %s %s" % (pid, rid, oid, ctg) + read_to_contig_map = makePypeLocalFile(os.path.join(read_map_dir, 'get_ctg_read_map', 'read_to_contig_map')) - wf.addTask( generate_read_to_ctg_map ) + task = PypeTask( + inputs = inputs, + outputs = {'read_to_contig_map': read_to_contig_map}, + TaskType = PypeThreadTaskBase, + URL = 'task://localhost/get_ctg_read_map') + wf.addTask(task(generate_read_to_ctg_map)) wf.refreshTargets() # block @@ -112,18 +125,19 @@ def parse_args(argv): parser = argparse.ArgumentParser(description='generate `2-asm-falcon/read_maps/read_to_contig_map` that contains the \ information from the chain of mapping: (contig id) -> (internal p-read id) -> (internal raw-read id) -> (original read id)', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('--basedir', type=str, default="./", help='the base working dir of a FALCON assembly') + parser.add_argument('--basedir', type=str, default='./', help='the base working dir of a FALCON assembly') args = parser.parse_args(argv[1:]) return args def main(argv=sys.argv): + logging.basicConfig() args = parse_args(argv) basedir = args.basedir - rawread_dir = os.path.abspath( os.path.join( basedir, "0-rawreads" ) ) - pread_dir = os.path.abspath( os.path.join( basedir, "1-preads_ovl" ) ) - asm_dir = os.path.abspath( os.path.join( basedir, "2-asm-falcon") ) + rawread_dir = os.path.abspath(os.path.join( basedir, '0-rawreads')) + pread_dir = os.path.abspath(os.path.join( basedir, '1-preads_ovl')) + asm_dir = os.path.abspath(os.path.join( basedir, '2-asm-falcon')) get_read_ctg_map(rawread_dir=rawread_dir, pread_dir=pread_dir, asm_dir=asm_dir) -if __name__ == "__main__": +if __name__ == '__main__': main() diff --git a/FALCON/falcon_kit/mains/pr_ctg_track.py b/FALCON/falcon_kit/mains/pr_ctg_track.py index f516866..f1b06fc 100644 --- a/FALCON/falcon_kit/mains/pr_ctg_track.py +++ b/FALCON/falcon_kit/mains/pr_ctg_track.py @@ -54,8 +54,8 @@ def tr_stage1(readlines, min_len, bestn, pid_to_ctg): def run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn): io.LOG('preparing tr_stage1') io.logstats() - asm_dir = os.path.abspath( os.path.join(base_dir, "2-asm-falcon") ) - pid_to_ctg = get_pid_to_ctg( os.path.join(asm_dir, "read_maps/read_to_contig_map" ) ) + asm_dir = os.path.abspath(os.path.join(base_dir, '2-asm-falcon')) + pid_to_ctg = get_pid_to_ctg(os.path.join(asm_dir, 'read_maps', 'get_ctg_read_map', 'read_to_contig_map')) inputs = [] for fn in file_list: inputs.append( (run_tr_stage1, db_fn, fn, min_len, bestn, pid_to_ctg) ) @@ -135,7 +135,7 @@ def track_reads(n_core, base_dir, min_len, bestn, debug, silent, stream): def parse_args(argv): parser = argparse.ArgumentParser(description='scan the pread overlap information to identify the best hit from the preads \ -to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/read_to_contig_map`', +to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/get_ctg_read_map/read_to_contig_map`', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--n_core', type=int, default=4, help='number of processes used for for tracking reads; ' diff --git a/FALCON/falcon_kit/mains/rr_ctg_track.py b/FALCON/falcon_kit/mains/rr_ctg_track.py index ad07f48..397d382 100644 --- a/FALCON/falcon_kit/mains/rr_ctg_track.py +++ b/FALCON/falcon_kit/mains/rr_ctg_track.py @@ -54,8 +54,8 @@ def tr_stage1(readlines, min_len, bestn, rid_to_ctg): def run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn): io.LOG('preparing tr_stage1') io.logstats() - asm_dir = os.path.abspath( os.path.join(base_dir, "2-asm-falcon") ) - rid_to_ctg = get_rid_to_ctg( os.path.join(asm_dir, "read_maps/read_to_contig_map" ) ) + asm_dir = os.path.abspath(os.path.join(base_dir, '2-asm-falcon')) + rid_to_ctg = get_rid_to_ctg(os.path.join(asm_dir, 'read_maps', 'get_ctg_read_map', 'read_to_contig_map')) inputs = [] for fn in file_list: inputs.append( (run_tr_stage1, db_fn, fn, min_len, bestn, rid_to_ctg) ) @@ -75,12 +75,12 @@ def run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn): else: heappushpop( bread_to_areads[k], item ) - #rid_to_oid = open(os.path.join( rawread_dir, "raw_read_ids" ) ).read().split("\n") + #rid_to_oid = open(os.path.join(rawread_dir, 'dump_rawread_ids', 'raw_read_ids')).read().split('\n') """ For each b-read, we find the best contig map throgh the b->a->contig map. """ - with open( os.path.join(asm_dir, "read_maps/rawread_to_contigs"), "w") as out_f: + with open(os.path.join(asm_dir, 'read_maps/rawread_to_contigs'), 'w') as out_f: for bread in bread_to_areads: ctg_score = {} @@ -145,7 +145,7 @@ def track_reads(n_core, base_dir, min_len, bestn, debug, silent, stream): def parse_args(argv): parser = argparse.ArgumentParser(description='scan the raw read overlap information to identify the best hit from the reads \ -to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/read_to_contig_map`', +to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/get_ctg_read_map/read_to_contig_map`', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--n_core', type=int, default=4, help='number of processes used for for tracking reads; ' diff --git a/FALCON/falcon_kit/mains/run1.py b/FALCON/falcon_kit/mains/run1.py index cdfb05b..8fd7352 100644 --- a/FALCON/falcon_kit/mains/run1.py +++ b/FALCON/falcon_kit/mains/run1.py @@ -1,9 +1,9 @@ from .. import run_support as support from .. import bash, pype_tasks from ..util.system import only_these_symlinks -from pypeflow.pwatcher_bridge import PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase -from pypeflow.data import makePypeLocalFile, fn -from pypeflow.task import PypeTask +#from pypeflow.pwatcher_bridge import PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase +#from pypeflow.data import makePypeLocalFile, fn +#from pypeflow.task import PypeTask from pypeflow.simple_pwatcher_bridge import (PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase, makePypeLocalFile, fn, PypeTask) import argparse @@ -133,7 +133,6 @@ def main1(prog_name, input_config_fn, logger_config_fn=None): fc_run_logger.exception('Failed to parse config "{}".'.format(input_config_fn)) raise input_fofn_plf = makePypeLocalFile(config['input_fofn']) - #Workflow = PypeProcWatcherWorkflow wf = PypeProcWatcherWorkflow(job_type=config['job_type'], job_queue=config['job_queue'], sge_option=config.get('sge_option', ''), @@ -142,13 +141,12 @@ def main1(prog_name, input_config_fn, logger_config_fn=None): run(wf, config, os.path.abspath(input_config_fn), input_fofn_plf=input_fofn_plf, - setNumThreadAllowed=PypeProcWatcherWorkflow.setNumThreadAllowed) + ) def run(wf, config, input_config_fn, input_fofn_plf, - setNumThreadAllowed, ): """ Preconditions (for now): diff --git a/FALCON/test/test_functional.py b/FALCON/test/test_functional.py index 832ba1f..aafb6d4 100644 --- a/FALCON/test/test_functional.py +++ b/FALCON/test/test_functional.py @@ -30,7 +30,6 @@ def test_get_daligner_job_descriptions_small(): example_se161 = os.path.join(thisdir, 'se161.sh') def test_get_daligner_job_descriptions_se161(): - # when there is only 1 block, a special case example_HPCdaligner = open(example_se161) result = f.get_daligner_job_descriptions( example_HPCdaligner, 'raw_reads', single=False) diff --git a/pypeFLOW/pypeflow/common.py b/pypeFLOW/pypeflow/common.py deleted file mode 100644 index 4c12aef..0000000 --- a/pypeFLOW/pypeflow/common.py +++ /dev/null @@ -1,149 +0,0 @@ - -# @author Jason Chin -# -# Copyright (C) 2010 by Jason Chin -# Copyright (C) 2011 by Jason Chin, Pacific Biosciences -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -""" - -PypeCommon: provide the common base classes and general module level utility functions - and constants for PypeEngine - -""" - -from urlparse import urlparse - -import rdflib -try: - from rdflib import ConjunctiveGraph as Graph #work for rdflib-3.1.0 - # need to install rdfextras for rdflib-3.0.0 - rdflib.plugin.register('sparql', rdflib.query.Processor, - 'rdfextras.sparql.processor', 'Processor') - rdflib.plugin.register('sparql', rdflib.query.Result, - 'rdfextras.sparql.query', 'SPARQLQueryResult') -except Exception: - from rdflib.Graph import ConjunctiveGraph as Graph #work for rdflib-2.4.2 -from rdflib import Namespace -from rdflib import Literal -from rdflib import URIRef - -from subprocess import Popen, PIPE -import time - -pypeNS = Namespace("pype://v0.1/") - -class PypeError(Exception): - def __init__(self, msg): - Exception.__init__(self, msg) # to make __repr__() show class name - self.msg = msg - def __str__(self): - return repr(self.msg) - -class NotImplementedError(PypeError): - pass - -class URLSchemeNotSupportYet(PypeError): - pass - -class PypeObject(object): - - """ - - Base class for all PypeObjects - - Every PypeObject should have an URL. - The instance attributes can be set by using keyword argument in __init__(). - - """ - - def __init__(self, URL, **attributes): - - URLParseResult = urlparse(URL) - if URLParseResult.scheme not in self.__class__.supportedURLScheme: - raise URLSchemeNotSupportYet("%s is not supported yet" % URLParseResult.scheme ) - else: - self.URL = URL - for k,v in attributes.iteritems(): - if k not in self.__dict__: - self.__dict__[k] = v - - def _updateURL(self, newURL): - URLParseResult = urlparse(self.URL) - newURLParseResult = urlparse(newURL) - if URLParseResult.scheme != newURLParseResult.scheme: - raise PypeError, "the URL scheme can not be changed for obj %s" % self.URL - self.URL = newURL - - @property - def _RDFGraph(self): - graph = Graph() - - for k, v in self.__dict__.iteritems(): - if k == "URL": continue - if k[0] == "_": continue - if hasattr(v, "URL"): - graph.add( ( URIRef(self.URL), pypeNS[k], URIRef(v.URL) ) ) - return graph - - - - @property - def RDFXML(self): - - """ - RDF XML representation of the everything related to the PypeObject - """ - - return self._RDFGraph.serialize() - -def runShellCmd(args,**kwargs): - - """ - Utility function that runs a shell script command. - It blocks until the command is finished. The return value - from the shell command is returned - """ - - p = Popen(args,**kwargs) - pStatus = None - while 1: - time.sleep(0.2) - pStatus = p.poll() - if pStatus != None: - break - return pStatus - -def runSgeSyncJob(args): - - """ - Utility function that runs a shell script with SGE. - It blocks until the command is finished. The return value - from the shell command is returned - """ - - p = Popen(args) - pStatus = None - while 1: - time.sleep(0.1) - pStatus = p.poll() - if pStatus != None: - break - return pStatus diff --git a/pypeFLOW/pypeflow/controller.py b/pypeFLOW/pypeflow/controller.py deleted file mode 100644 index ce39e89..0000000 --- a/pypeFLOW/pypeflow/controller.py +++ /dev/null @@ -1,875 +0,0 @@ -# @author Jason Chin -# -# Copyright (C) 2010 by Jason Chin -# Copyright (C) 2011 by Jason Chin, Pacific Biosciences -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -""" - -PypeController: This module provides the PypeWorkflow that controlls how a workflow is excuted. - -""" -import sys -import datetime -import multiprocessing -import threading -import time -import traceback -import logging -import Queue -from cStringIO import StringIO -from urlparse import urlparse - -# TODO(CD): When we stop using Python 2.5, use relative-imports and remove this dir from PYTHONPATH. -from common import PypeError, PypeObject, Graph, URIRef, pypeNS -from data import PypeDataObjectBase, PypeSplittableLocalFile -from task import PypeTaskBase, PypeTaskCollection, PypeThreadTaskBase, getFOFNMapTasks -from task import TaskInitialized, TaskDone, TaskFail - -logger = logging.getLogger(__name__) - -class TaskExecutionError(PypeError): - pass -class TaskTypeError(PypeError): - pass -class TaskFailureError(PypeError): - pass -class LateTaskFailureError(PypeError): - pass - -class PypeNode(object): - """ - Representing a node in the dependence DAG. - """ - - def __init__(self, obj): - self.obj = obj - self._outNodes = set() - self._inNodes = set() - - def addAnOutNode(self, obj): - self._outNodes.add(obj) - - def addAnInNode(self, obj): - self._inNodes.add(obj) - - def removeAnOutNode(self, obj): - self._outNodes.remove(obj) - - def removeAnInNode(self, obj): - self._inNodes.remove(obj) - - @property - def inDegree(self): - return len(self._inNodes) - - @property - def outDegree(self): - return len(self._outNodes) - - @property - def depth(self): - if self.inDegree == 0: - return 1 - return 1 + max([ node.depth for node in self._inNodes ]) - -class PypeGraph(object): - """ - Representing a dependence DAG with PypeObjects. - """ - - def __init__(self, RDFGraph, subGraphNodes=None): - """ - Construct an internal DAG with PypeObject given an RDF graph. - A sub-graph can be constructed if subGraphNodes is not "None" - """ - - self._RDFGraph = RDFGraph - self._allEdges = set() - self._allNodes = set() - self.url2Node ={} - - for row in self._RDFGraph.query('SELECT ?s ?o WHERE {?s pype:prereq ?o . }', initNs=dict(pype=pypeNS)): - if subGraphNodes != None: - if row[0] not in subGraphNodes: continue - if row[1] not in subGraphNodes: continue - - sURL, oURL = str(row[0]), str(row[1]) - - self.url2Node[sURL] = self.url2Node.get( sURL, PypeNode(str(row[0])) ) - self.url2Node[oURL] = self.url2Node.get( oURL, PypeNode(str(row[1])) ) - - n1 = self.url2Node[oURL] - n2 = self.url2Node[sURL] - - n1.addAnOutNode(n2) - n2.addAnInNode(n1) - - anEdge = (n1, n2) - self._allNodes.add( n1 ) - self._allNodes.add( n2 ) - self._allEdges.add( anEdge ) - - def __getitem__(self, url): - """PypeGraph["URL"] ==> PypeNode""" - return self.url2Node[url] - - def tSort(self): #return a topoloical sort node list - """ - Output topological sorted list of the graph element. - It raises a TeskExecutionError if a circle is detected. - """ - edges = self._allEdges.copy() - - S = [x for x in self._allNodes if x.inDegree == 0] - L = [] - while len(S) != 0: - n = S.pop() - L.append(n) - outNodes = n._outNodes.copy() - for m in outNodes: - edges.remove( (n, m) ) - n.removeAnOutNode(m) - m.removeAnInNode(n) - if m.inDegree == 0: - S.append(m) - - if len(edges) != 0: - raise TaskExecutionError(" Circle detectd in the dependency graph ") - else: - return [x.obj for x in L] - -class PypeWorkflow(PypeObject): - """ - Representing a PypeWorkflow. PypeTask and PypeDataObjects can be added - into the workflow and executed through the instanct methods. - - >>> import os, time - >>> from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn - >>> from pypeflow.task import * - >>> try: - ... os.makedirs("/tmp/pypetest") - ... _ = os.system("rm -f /tmp/pypetest/*") - ... except Exception: - ... pass - >>> time.sleep(1) - >>> fin = makePypeLocalFile("/tmp/pypetest/testfile_in", readOnly=False) - >>> fout = makePypeLocalFile("/tmp/pypetest/testfile_out", readOnly=False) - >>> @PypeTask(outputDataObjs={"test_out":fout}, - ... inputDataObjs={"test_in":fin}, - ... parameters={"a":'I am "a"'}, **{"b":'I am "b"'}) - ... def test(self): - ... print test.test_in.localFileName - ... print test.test_out.localFileName - ... os.system( "touch %s" % fn(test.test_out) ) - ... pass - >>> os.system( "touch %s" % (fn(fin)) ) - 0 - >>> from pypeflow.controller import PypeWorkflow - >>> wf = PypeWorkflow() - >>> wf.addTask(test) - >>> def finalize(self): - ... def f(): - ... print "in finalize:", self._status - ... return f - >>> test.finalize = finalize(test) # For testing only. Please don't do this in your code. The PypeTask.finalized() is intended to be overriden by subclasses. - >>> wf.refreshTargets( objs = [fout] ) - /tmp/pypetest/testfile_in - /tmp/pypetest/testfile_out - in finalize: done - True - """ - - supportedURLScheme = ["workflow"] - - def __init__(self, URL = None, **attributes ): - - if URL == None: - URL = "workflow://" + __file__+"/%d" % id(self) - - self._pypeObjects = {} - - PypeObject.__init__(self, URL, **attributes) - - self._referenceRDFGraph = None #place holder for a reference RDF - - - def addObject(self, obj): - self.addObjects([obj]) - - def addObjects(self, objs): - """ - Add data objects into the workflow. One can add also task object to the workflow using this method for - non-threaded workflow. - """ - for obj in objs: - if obj.URL in self._pypeObjects: - if id(self._pypeObjects[obj.URL]) != id(obj): - raise PypeError, "Add different objects with the same URL %s" % obj.URL - else: - continue - self._pypeObjects[obj.URL] = obj - - def addTask(self, taskObj): - self.addTasks([taskObj]) - - - def addTasks(self, taskObjs): - """ - Add tasks into the workflow. The dependent input and output data objects are added automatically too. - It sets the message queue used for communicating between the task thread and the main thread. One has - to use addTasks() or addTask() to add task objects to a threaded workflow. - """ - for taskObj in taskObjs: - if isinstance(taskObj, PypeTaskCollection): - for subTaskObj in taskObj.getTasks() + taskObj.getScatterGatherTasks(): - self.addObjects(subTaskObj.inputDataObjs.values()) - self.addObjects(subTaskObj.outputDataObjs.values()) - self.addObjects(subTaskObj.mutableDataObjs.values()) - self.addObject(subTaskObj) - - else: - for dObj in taskObj.inputDataObjs.values() +\ - taskObj.outputDataObjs.values() +\ - taskObj.mutableDataObjs.values() : - if isinstance(dObj, PypeSplittableLocalFile): - self.addObjects([dObj._completeFile]) - self.addObjects([dObj]) - - self.addObject(taskObj) - - - def removeTask(self, taskObj): - self.removeTasks([taskObj]) - - def removeTasks(self, taskObjs ): - """ - Remove tasks from the workflow. - """ - self.removeObjects(taskObjs) - - def removeObjects(self, objs): - """ - Remove objects from the workflow. If the object cannot be found, a PypeError is raised. - """ - for obj in objs: - if obj.URL in self._pypeObjects: - del self._pypeObjects[obj.URL] - else: - raise PypeError, "Unable to remove %s from the graph. (Object not found)" % obj.URL - - def updateURL(self, oldURL, newURL): - obj = self._pypeObjects[oldURL] - obj._updateURL(newURL) - self._pypeObjects[newURL] = obj - del self._pypeObjects[oldURL] - - - - @property - def _RDFGraph(self): - # expensive to recompute - graph = Graph() - for URL, obj in self._pypeObjects.iteritems(): - for s,p,o in obj._RDFGraph: - graph.add( (s,p,o) ) - return graph - - def setReferenceRDFGraph(self, fn): - self._referenceRDFGraph = Graph() - self._referenceRDFGraph.load(fn) - refMD5s = self._referenceRDFGraph.subject_objects(pypeNS["codeMD5digest"]) - for URL, md5digest in refMD5s: - obj = self._pypeObjects[str(URL)] - obj.setReferenceMD5(md5digest) - - def _graphvizDot(self, shortName=False): - graph = self._RDFGraph - dotStr = StringIO() - shapeMap = {"file":"box", "state":"box", "task":"component"} - colorMap = {"file":"yellow", "state":"cyan", "task":"green"} - dotStr.write( 'digraph "%s" {\n rankdir=LR;' % self.URL) - for URL in self._pypeObjects.keys(): - URLParseResult = urlparse(URL) - if URLParseResult.scheme not in shapeMap: - continue - else: - shape = shapeMap[URLParseResult.scheme] - color = colorMap[URLParseResult.scheme] - - s = URL - if shortName == True: - s = URLParseResult.scheme + "://..." + URLParseResult.path.split("/")[-1] - dotStr.write( '"%s" [shape=%s, fillcolor=%s, style=filled];\n' % (s, shape, color)) - - for row in graph.query('SELECT ?s ?o WHERE {?s pype:prereq ?o . }', initNs=dict(pype=pypeNS)): - s, o = row - if shortName == True: - s = urlparse(s).scheme + "://..." + urlparse(s).path.split("/")[-1] - o = urlparse(o).scheme + "://..." + urlparse(o).path.split("/")[-1] - dotStr.write( '"%s" -> "%s";\n' % (o, s)) - for row in graph.query('SELECT ?s ?o WHERE {?s pype:hasMutable ?o . }', initNs=dict(pype=pypeNS)): - s, o = row - if shortName == True: - s = urlparse(s).scheme + "://..." + urlparse(s).path.split("/")[-1] - o = urlparse(o).scheme + "://..." + urlparse(o).path.split("/")[-1] - dotStr.write( '"%s" -- "%s" [arrowhead=both, style=dashed ];\n' % (s, o)) - dotStr.write ("}") - return dotStr.getvalue() - - @property - def graphvizDot(self): - return self._graphvizDot() - - @property - def graphvizShortNameDot(self): - return self._graphvizDot(shortName = True) - - @property - def makeFileStr(self): - """ - generate a string that has the information of the execution dependency in - a "Makefile" like format. It can be written into a "Makefile" and - executed by "make". - """ - for URL in self._pypeObjects.keys(): - URLParseResult = urlparse(URL) - if URLParseResult.scheme != "task": continue - taskObj = self._pypeObjects[URL] - if not hasattr(taskObj, "script"): - raise TaskTypeError("can not convert non shell script based workflow to a makefile") - makeStr = StringIO() - for URL in self._pypeObjects.keys(): - URLParseResult = urlparse(URL) - if URLParseResult.scheme != "task": continue - taskObj = self._pypeObjects[URL] - inputFiles = taskObj.inputDataObjs - outputFiles = taskObj.outputDataObjs - #for oStr in [o.localFileName for o in outputFiles.values()]: - if 1: - oStr = " ".join( [o.localFileName for o in outputFiles.values()]) - - iStr = " ".join([i.localFileName for i in inputFiles.values()]) - makeStr.write( "%s:%s\n" % ( oStr, iStr ) ) - makeStr.write( "\t%s\n\n" % taskObj.script ) - makeStr.write("all: %s" % " ".join([o.localFileName for o in outputFiles.values()]) ) - return makeStr.getvalue() - - @staticmethod - def getSortedURLs(rdfGraph, objs): - if len(objs) != 0: - connectedPypeNodes = set() - for obj in objs: - if isinstance(obj, PypeSplittableLocalFile): - obj = obj._completeFile - for x in rdfGraph.transitive_objects(URIRef(obj.URL), pypeNS["prereq"]): - connectedPypeNodes.add(x) - tSortedURLs = PypeGraph(rdfGraph, connectedPypeNodes).tSort( ) - else: - tSortedURLs = PypeGraph(rdfGraph).tSort( ) - return tSortedURLs - - def refreshTargets(self, objs = [], callback = (None, None, None) ): - """ - Execute the DAG to reach all objects in the "objs" argument. - """ - tSortedURLs = self.getSortedURLs(self._RDFGraph, objs) - for URL in tSortedURLs: - obj = self._pypeObjects[URL] - if not isinstance(obj, PypeTaskBase): - continue - else: - obj() - obj.finalize() - self._runCallback(callback) - return True - - def _runCallback(self, callback = (None, None, None ) ): - if callback[0] != None and callable(callback[0]): - argv = [] - kwargv = {} - if callback[1] != None and isinstance( callback[1], type(list()) ): - argv = callback[1] - else: - raise TaskExecutionError( "callback argument type error") - - if callback[2] != None and isinstance( callback[1], type(dict()) ): - kwargv = callback[2] - else: - raise TaskExecutionError( "callback argument type error") - - callback[0](*argv, **kwargv) - - elif callback[0] != None: - raise TaskExecutionError( "callback is not callable") - - @property - def dataObjects( self ): - return [ o for o in self._pypeObjects.values( ) if isinstance( o, PypeDataObjectBase )] - - @property - def tasks( self ): - return [ o for o in self._pypeObjects.values( ) if isinstance( o, PypeTaskBase )] - - @property - def inputDataObjects(self): - graph = self._RDFGraph - inputObjs = [] - for obj in self.dataObjects: - r = graph.query('SELECT ?o WHERE {<%s> pype:prereq ?o . }' % obj.URL, initNs=dict(pype=pypeNS)) - if len(r) == 0: - inputObjs.append(obj) - return inputObjs - - @property - def outputDataObjects(self): - graph = self._RDFGraph - outputObjs = [] - for obj in self.dataObjects: - r = graph.query('SELECT ?s WHERE {?s pype:prereq <%s> . }' % obj.URL, initNs=dict(pype=pypeNS)) - if len(r) == 0: - outputObjs.append(obj) - return outputObjs - -def PypeMPWorkflow(URL = None, **attributes): - """Factory for the workflow using multiprocessing. - """ - th = _PypeProcsHandler() - mq = multiprocessing.Queue() - se = multiprocessing.Event() - return _PypeConcurrentWorkflow(URL=URL, thread_handler=th, messageQueue=mq, shutdown_event=se, - attributes=attributes) - -def PypeThreadWorkflow(URL = None, **attributes): - """Factory for the workflow using threading. - """ - th = _PypeThreadsHandler() - mq = Queue.Queue() - se = threading.Event() - return _PypeConcurrentWorkflow(URL=URL, thread_handler=th, messageQueue=mq, shutdown_event=se, - attributes=attributes) - -class _PypeConcurrentWorkflow(PypeWorkflow): - """ - Representing a PypeWorkflow that can excute tasks concurrently using threads. It - assume all tasks block until they finish. PypeTask and PypeDataObjects can be added - into the workflow and executed through the instance methods. - """ - - CONCURRENT_THREAD_ALLOWED = 16 - MAX_NUMBER_TASK_SLOT = CONCURRENT_THREAD_ALLOWED - - @classmethod - def setNumThreadAllowed(cls, nT, nS): - """ - Override the default number of threads used to run the tasks with this method. - """ - cls.CONCURRENT_THREAD_ALLOWED = nT - cls.MAX_NUMBER_TASK_SLOT = nS - - def __init__(self, URL, thread_handler, messageQueue, shutdown_event, attributes): - PypeWorkflow.__init__(self, URL, **attributes ) - self.thread_handler = thread_handler - self.messageQueue = messageQueue - self.shutdown_event = shutdown_event - self.jobStatusMap = dict() - - def addTasks(self, taskObjs): - """ - Add tasks into the workflow. The dependent input and output data objects are added automatically too. - It sets the message queue used for communicating between the task thread and the main thread. One has - to use addTasks() or addTask() to add task objects to a threaded workflow. - """ - for taskObj in taskObjs: - if isinstance(taskObj, PypeTaskCollection): - for subTaskObj in taskObj.getTasks() + taskObj.getScatterGatherTasks(): - if not isinstance(subTaskObj, PypeThreadTaskBase): - raise TaskTypeError("Only PypeThreadTask can be added into a PypeThreadWorkflow. The task object %s has type %s " % (subTaskObj.URL, repr(type(subTaskObj)))) - subTaskObj.setMessageQueue(self.messageQueue) - subTaskObj.setShutdownEvent(self.shutdown_event) - else: - if not isinstance(taskObj, PypeThreadTaskBase): - raise TaskTypeError("Only PypeThreadTask can be added into a PypeThreadWorkflow. The task object has type %s " % repr(type(taskObj))) - taskObj.setMessageQueue(self.messageQueue) - taskObj.setShutdownEvent(self.shutdown_event) - - PypeWorkflow.addTasks(self, taskObjs) - - def refreshTargets(self, objs=None, - callback=(None, None, None), - updateFreq=None, - exitOnFailure=True): - if objs is None: - objs = [] - task2thread = {} - try: - rtn = self._refreshTargets(task2thread, objs = objs, callback = callback, updateFreq = updateFreq, exitOnFailure = exitOnFailure) - return rtn - except: - tb = traceback.format_exc() - self.shutdown_event.set() - logger.critical("Any exception caught in RefreshTargets() indicates an unrecoverable error. Shutting down...") - shutdown_msg = """ - "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" - "! Please wait for all threads / processes to terminate !" - "! Also, maybe use 'ps' or 'qstat' to check all threads,!" - "! processes and/or jobs are terminated cleanly. !" - "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" - """ - import warnings - warnings.warn(shutdown_msg) - th = self.thread_handler - threads = list(task2thread.values()) - logger.warning("#tasks=%d, #alive=%d" %(len(threads), th.alive(threads))) - try: - while th.alive(threads): - th.join(threads, 2) - logger.warning("Now, #tasks=%d, #alive=%d" %(len(threads), th.alive(threads))) - except (KeyboardInterrupt, SystemExit) as e: - logger.debug("Interrupted while joining threads (while handling exception from RefreshTargets()). Trying to terminate any working processes before final exit.") - th.notifyTerminate(threads) - raise Exception('Caused by:\n' + tb) - - - def _refreshTargets(self, task2thread, objs, - callback, - updateFreq, - exitOnFailure): - thread = self.thread_handler.create - - rdfGraph = self._RDFGraph # expensive to recompute, should not change during execution - tSortedURLs = self.getSortedURLs(rdfGraph, objs) - - sortedTaskList = [ (str(u), self._pypeObjects[u], self._pypeObjects[u].getStatus()) for u in tSortedURLs - if isinstance(self._pypeObjects[u], PypeTaskBase) ] - self.jobStatusMap = dict( ( (t[0], t[2]) for t in sortedTaskList ) ) - logger.info("# of tasks in complete graph: %d" %( - len(sortedTaskList), - )) - - prereqJobURLMap = {} - - for URL, taskObj, tStatus in sortedTaskList: - prereqJobURLs = [str(u) for u in rdfGraph.transitive_objects(URIRef(URL), pypeNS["prereq"]) - if isinstance(self._pypeObjects[str(u)], PypeTaskBase) and str(u) != URL ] - - prereqJobURLMap[URL] = prereqJobURLs - - logger.debug("Determined prereqs for %r to be %r" % (URL, ", ".join(prereqJobURLs))) - - if taskObj.nSlots > self.MAX_NUMBER_TASK_SLOT: - raise TaskExecutionError("%s requests more %s task slots which is more than %d task slots allowed" % - (str(URL), taskObj.nSlots, self.MAX_NUMBER_TASK_SLOT) ) - - sleep_time = 0 - nSubmittedJob = 0 - usedTaskSlots = 0 - loopN = 0 - lastUpdate = None - activeDataObjs = set() #keep a set of output data object. repeats are illegal. - mutableDataObjs = set() #keep a set of mutable data object. a task will be delayed if a running task has the same output. - updatedTaskURLs = set() #to avoid extra stat-calls - failedJobCount = 0 - succeededJobCount = 0 - jobsReadyToBeSubmitted = [] - - while 1: - - loopN += 1 - if not ((loopN - 1) & loopN): - # exponential back-off for logging - logger.info("tick: %d, #updatedTasks: %d, sleep_time=%f" %(loopN, len(updatedTaskURLs), sleep_time)) - - for URL, taskObj, tStatus in sortedTaskList: - if self.jobStatusMap[URL] != TaskInitialized: - continue - logger.debug(" #outputDataObjs: %d; #mutableDataObjs: %d" %( - len(taskObj.outputDataObjs.values()), - len(taskObj.mutableDataObjs.values()), - )) - prereqJobURLs = prereqJobURLMap[URL] - - logger.debug(' preqs of %s:' %URL) - for u in prereqJobURLs: - logger.debug(' %s: %s' %(self.jobStatusMap[u], u)) - if any(self.jobStatusMap[u] != "done" for u in prereqJobURLs): - # Note: If self.jobStatusMap[u] raises, then the sorting was wrong. - #logger.debug('Prereqs not done! %s' %URL) - continue - # Check for mutable collisions; delay task if any. - outputCollision = False - for dataObj in taskObj.mutableDataObjs.values(): - for fromTaskObjURL, mutableDataObjURL in mutableDataObjs: - if dataObj.URL == mutableDataObjURL and taskObj.URL != fromTaskObjURL: - logger.debug("mutable output collision detected for data object %r betw %r and %r" %( - dataObj, dataObj.URL, mutableDataObjURL)) - outputCollision = True - break - if outputCollision: - continue - # Check for illegal collisions. - if len(activeDataObjs) < 100: - # O(n^2) on active tasks, but pretty fast. - for dataObj in taskObj.outputDataObjs.values(): - for fromTaskObjURL, activeDataObjURL in activeDataObjs: - if dataObj.URL == activeDataObjURL and taskObj.URL != fromTaskObjURL: - raise Exception("output collision detected for data object %r betw %r and %r" %( - dataObj, dataObj.URL, activeDataObjURL)) - # We use 'updatedTaskURLs' to short-circuit 'isSatisfied()', to avoid many stat-calls. - # Note: Sorting should prevent FileNotExistError in isSatisfied(). - if not (set(prereqJobURLs) & updatedTaskURLs) and taskObj.isSatisfied(): - #taskObj.setStatus(pypeflow.task.TaskDone) # Safe b/c thread is not running yet, and all prereqs are done. - logger.info(' Skipping already done task: %s' %(URL,)) - logger.debug(' (Status was %s)' %(self.jobStatusMap[URL],)) - taskObj.setStatus(TaskDone) # to avoid re-stat on subsequent call to refreshTargets() - self.jobStatusMap[str(URL)] = TaskDone # to avoid re-stat on *this* call - successfullTask = self._pypeObjects[URL] - successfullTask.finalize() - continue - self.jobStatusMap[str(URL)] = "ready" # in case not all ready jobs are given threads immediately, to avoid re-stat - jobsReadyToBeSubmitted.append( (URL, taskObj) ) - for dataObj in taskObj.outputDataObjs.values(): - logger.debug( "add active data obj: %s" %(dataObj,)) - activeDataObjs.add( (taskObj.URL, dataObj.URL) ) - for dataObj in taskObj.mutableDataObjs.values(): - logger.debug( "add mutable data obj: %s" %(dataObj,)) - mutableDataObjs.add( (taskObj.URL, dataObj.URL) ) - - logger.debug( "#jobsReadyToBeSubmitted: %d" % len(jobsReadyToBeSubmitted) ) - - numAliveThreads = self.thread_handler.alive(task2thread.values()) - logger.debug( "Total # of running threads: %d; alive tasks: %d; sleep=%f, loopN=%d" % ( - threading.activeCount(), numAliveThreads, sleep_time, loopN) ) - if numAliveThreads == 0 and len(jobsReadyToBeSubmitted) == 0 and self.messageQueue.empty(): - #TODO: better job status detection. messageQueue should be empty and all return condition should be "done", or "fail" - logger.info( "_refreshTargets() finished with no thread running and no new job to submit" ) - for URL in task2thread: - assert self.jobStatusMap[str(URL)] in ("done", "fail"), "status(%s)==%r" %( - URL, self.jobStatusMap[str(URL)]) - break # End of loop! - - while jobsReadyToBeSubmitted: - URL, taskObj = jobsReadyToBeSubmitted[0] - numberOfEmptySlot = self.MAX_NUMBER_TASK_SLOT - usedTaskSlots - logger.debug( "#empty_slots = %d/%d; #jobs_ready=%d" % (numberOfEmptySlot, self.MAX_NUMBER_TASK_SLOT, len(jobsReadyToBeSubmitted))) - if numberOfEmptySlot >= taskObj.nSlots and numAliveThreads < self.CONCURRENT_THREAD_ALLOWED: - t = thread(target = taskObj) - t.start() - task2thread[URL] = t - nSubmittedJob += 1 - usedTaskSlots += taskObj.nSlots - numAliveThreads += 1 - self.jobStatusMap[URL] = "submitted" - # Note that we re-submit completed tasks whenever refreshTargets() is called. - logger.debug("Submitted %r" %URL) - logger.debug(" Details: %r" %taskObj) - jobsReadyToBeSubmitted.pop(0) - else: - break - - time.sleep(sleep_time) - if updateFreq != None: - elapsedSeconds = updateFreq if lastUpdate==None else (datetime.datetime.now()-lastUpdate).seconds - if elapsedSeconds >= updateFreq: - self._update( elapsedSeconds ) - lastUpdate = datetime.datetime.now( ) - - sleep_time = sleep_time + 0.1 if (sleep_time < 1) else 1 - while not self.messageQueue.empty(): - sleep_time = 0 # Wait very briefly while messages are coming in. - URL, message = self.messageQueue.get() - updatedTaskURLs.add(URL) - self.jobStatusMap[str(URL)] = message - logger.debug("message for %s: %r" %(URL, message)) - - if message in ["done"]: - successfullTask = self._pypeObjects[str(URL)] - nSubmittedJob -= 1 - usedTaskSlots -= successfullTask.nSlots - logger.info("Success (%r). Joining %r..." %(message, URL)) - task2thread[URL].join(timeout=10) - #del task2thread[URL] - succeededJobCount += 1 - successfullTask.finalize() - for o in successfullTask.outputDataObjs.values(): - activeDataObjs.remove( (successfullTask.URL, o.URL) ) - for o in successfullTask.mutableDataObjs.values(): - mutableDataObjs.remove( (successfullTask.URL, o.URL) ) - elif message in ["fail"]: - failedTask = self._pypeObjects[str(URL)] - nSubmittedJob -= 1 - usedTaskSlots -= failedTask.nSlots - logger.info("Failure (%r). Joining %r..." %(message, URL)) - task2thread[URL].join(timeout=10) - #del task2thread[URL] - failedJobCount += 1 - failedTask.finalize() - for o in failedTask.outputDataObjs.values(): - activeDataObjs.remove( (failedTask.URL, o.URL) ) - for o in failedTask.mutableDataObjs.values(): - mutableDataObjs.remove( (failedTask.URL, o.URL) ) - elif message in ["started, runflag: 1"]: - logger.info("Queued %s ..." %repr(URL)) - elif message in ["started, runflag: 0"]: - logger.debug("Queued %s (already completed) ..." %repr(URL)) - raise Exception('It should not be possible to start an already completed task.') - else: - logger.warning("Got unexpected message %r from URL %r." %(message, URL)) - - for u,s in sorted(self.jobStatusMap.items()): - logger.debug("task status: %r, %r, used slots: %d" % (str(u),str(s), self._pypeObjects[str(u)].nSlots)) - - if failedJobCount != 0 and (exitOnFailure or succeededJobCount == 0): - raise TaskFailureError("Counted %d failure(s) with 0 successes so far." %failedJobCount) - - - for u,s in sorted(self.jobStatusMap.items()): - logger.debug("task status: %s, %r" % (u, s)) - - self._runCallback(callback) - if failedJobCount != 0: - # Slightly different exception when !exitOnFailure. - raise LateTaskFailureError("Counted a total of %d failure(s) and %d success(es)." %( - failedJobCount, succeededJobCount)) - return True #TODO: There is no reason to return anything anymore. - - def _update(self, elapsed): - """Can be overridden to provide timed updates during execution""" - pass - - def _graphvizDot(self, shortName=False): - - graph = self._RDFGraph - dotStr = StringIO() - shapeMap = {"file":"box", "state":"box", "task":"component"} - colorMap = {"file":"yellow", "state":"cyan", "task":"green"} - dotStr.write( 'digraph "%s" {\n rankdir=LR;' % self.URL) - - - for URL in self._pypeObjects.keys(): - URLParseResult = urlparse(URL) - if URLParseResult.scheme not in shapeMap: - continue - else: - shape = shapeMap[URLParseResult.scheme] - color = colorMap[URLParseResult.scheme] - - s = URL - if shortName == True: - s = URLParseResult.scheme + "://..." + URLParseResult.path.split("/")[-1] - - if URLParseResult.scheme == "task": - jobStatus = self.jobStatusMap.get(URL, None) - if jobStatus != None: - if jobStatus == "fail": - color = 'red' - elif jobStatus == "done": - color = 'green' - else: - color = 'white' - - dotStr.write( '"%s" [shape=%s, fillcolor=%s, style=filled];\n' % (s, shape, color)) - - for row in graph.query('SELECT ?s ?o WHERE {?s pype:prereq ?o . }', initNs=dict(pype=pypeNS)): - s, o = row - if shortName == True: - s = urlparse(s).scheme + "://..." + urlparse(s).path.split("/")[-1] - o = urlparse(o).scheme + "://..." + urlparse(o).path.split("/")[-1] - dotStr.write( '"%s" -> "%s";\n' % (o, s)) - for row in graph.query('SELECT ?s ?o WHERE {?s pype:hasMutable ?o . }', initNs=dict(pype=pypeNS)): - s, o = row - if shortName == True: - s = urlparse(s).scheme + "://..." + urlparse(s).path.split("/")[-1] - o = urlparse(o).scheme + "://..." + urlparse(o).path.split("/")[-1] - dotStr.write( '"%s" -- "%s" [arrowhead=both, style=dashed ];\n' % (s, o)) - dotStr.write ("}") - return dotStr.getvalue() - -# For a class-method: -PypeThreadWorkflow.setNumThreadAllowed = _PypeConcurrentWorkflow.setNumThreadAllowed -PypeMPWorkflow.setNumThreadAllowed = _PypeConcurrentWorkflow.setNumThreadAllowed - -class _PypeThreadsHandler(object): - """Stateless method delegator, for injection. - """ - def create(self, target): - thread = threading.Thread(target=target) - thread.daemon = True # so it will terminate on exit - return thread - def alive(self, threads): - return sum(thread.is_alive() for thread in threads) - def join(self, threads, timeout): - then = datetime.datetime.now() - for thread in threads: - assert thread is not threading.current_thread() - if thread.is_alive(): - to = max(0, timeout - (datetime.datetime.now() - then).seconds) - thread.join(to) - def notifyTerminate(self, threads): - """Assume these are daemon threads. - We will attempt to join them all quickly, but non-daemon threads may - eventually block the program from quitting. - """ - self.join(threads, 1) - -class _PypeProcsHandler(object): - """Stateless method delegator, for injection. - """ - def create(self, target): - proc = multiprocessing.Process(target=target) - return proc - def alive(self, procs): - return sum(proc.is_alive() for proc in procs) - def join(self, procs, timeout): - then = datetime.datetime.now() - for proc in procs: - if proc.is_alive(): - proc.join((datetime.datetime.now() - then).seconds) - def notifyTerminate(self, procs): - """This can orphan sub-processes. - """ - for proc in procs: - if proc.is_alive(): - proc.terminate() - - -def defaultOutputTemplate(fn): - return fn + ".out" - -def applyFOFN( task_fun = None, - fofonFileName = None, - outTemplateFunc = defaultOutputTemplate, - nproc = 8 ): - - tasks = getFOFNMapTasks( FOFNFileName = fofonFileName, - outTemplateFunc = outTemplateFunc, - TaskType=PypeThreadTaskBase, - parameters = dict(nSlots = 1))( task_fun ) - - wf = PypeThreadWorkflow() - wf.CONCURRENT_THREAD_ALLOWED = nproc - wf.MAX_NUMBER_TASK_SLOT = nproc - wf.addTasks(tasks) - wf.refreshTargets(exitOnFailure=False) - - -if __name__ == "__main__": - import doctest - doctest.testmod() diff --git a/pypeFLOW/pypeflow/data.py b/pypeFLOW/pypeflow/data.py deleted file mode 100644 index 43a5dc2..0000000 --- a/pypeFLOW/pypeflow/data.py +++ /dev/null @@ -1,315 +0,0 @@ - -# @author Jason Chin -# -# Copyright (C) 2010 by Jason Chin -# Copyright (C) 2011 by Jason Chin, Pacific Biosciences -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -""" - -PypeData: This module defines the general interface and class for PypeData Objects. - -""" - - -from urlparse import urlparse, urljoin -import platform -import os, shutil -from common import pypeNS, PypeObject, PypeError, NotImplementedError -import logging - -logger = logging.getLogger(__name__) - -class FileNotExistError(PypeError): - pass - -class TypeMismatchError(PypeError): - pass - -def fn(obj): - return obj.localFileName - -class PypeDataObjectBase(PypeObject): - - """ - Represent the common interface for a PypeData object. - """ - - def __init__(self, URL, **attributes): - PypeObject.__init__(self, URL, **attributes) - self.verification = [] - self._mutatble = False - - @property - def timeStamp(self): - raise NotImplementedError, self.__expr__() - - @property - def isMutable(self): - return self._mutatble - - @property - def exists(self): - raise NotImplementedError - - def addVerifyFunction( self, verifyFunction ): - self.verification.append( verifyFunction ) - - def __str__( self ): - return self.URL - - def _updateURL(self, newURL): - super(PypeDataObjectBase, self)._updateURL(newURL) - self._updatePath() - - def _updatePath(self): - URLParseResult = urlparse(self.URL) - self.localFileName = URLParseResult.path - self._path = self.localFileName #for local file, _path is the same as full local file name - -class PypeLocalFile(PypeDataObjectBase): - - """ - Represent a PypeData object that can be accessed as a file in a local - filesystem. - - >>> f = PypeLocalFile("file://localhost/test.txt") - >>> f.localFileName == "/test.txt" - True - >>> fn(f) - '/test.txt' - >>> f = PypeLocalFile("file://localhost/test.txt", False, isFasta = True) - >>> f.isFasta == True - True - """ - def __repr__(self): return "PypeLocalFile(%r, %r)" %(self.URL, self._path) - supportedURLScheme = ["file", "state"] - def __init__(self, URL, readOnly = False, **attributes): - PypeDataObjectBase.__init__(self, URL, **attributes) - self._updatePath() - self.readOnly = readOnly - self._mutable = attributes.get("mutable", False) - - @property - def timeStamp(self): - if not os.path.exists(self.localFileName): - logger.warning('failed: os.path.exists("%s")\n%s'%( - self.localFileName,repr(os.listdir(os.path.dirname(self.localFileName))))) - raise FileNotExistError("No such file:%r on %r" % (self.localFileName, platform.node()) ) - return os.stat(self.localFileName).st_mtime - - @property - def exists(self): - return os.path.exists(self.localFileName) - - def verify(self): - logger.debug("Verifying contents of %s" % self.URL) - # Get around the NFS problem - os.listdir(os.path.dirname(self.path)) - - errors = [ ] - for verifyFn in self.verification: - try: - errors.extend( verifyFn(self.path) ) - except Exception, e: - errors.append( str(e) ) - if len(errors) > 0: - for e in errors: - logger.error(e) - return errors - - @property - def path(self): - if self._path == None: - raise IOError, "Must resolve this file (%s) with a context " + \ - "before you can access .path" - return self._path - - def clean(self): - if os.path.exists( self.path ): - logger.info("Removing %s" % self.path ) - if os.path.isdir( self.path ): - shutil.rmtree( self.path ) - else: - os.remove( self.path ) - -class PypeHDF5Dataset(PypeDataObjectBase): #stub for now Mar 17, 2010 - - """ - Represent a PypeData object that is an HDF5 dataset. - Not implemented yet. - """ - - supportedURLScheme = ["hdf5ds"] - def __init__(self, URL, readOnly = False, **attributes): - PypeDataObjectBase.__init__(self, URL, **attributes) - URLParseResult = urlparse(URL) - self.localFileName = URLParseResult.path - #the rest of the URL goes to HDF5 DS - - -class PypeLocalFileCollection(PypeDataObjectBase): #stub for now Mar 17, 2010 - - """ - Represent a PypeData object that is a composition of multiple files. - It will provide a container that allows the tasks to choose one or all file to - process. - """ - - supportedURLScheme = ["files"] - def __init__(self, URL, readOnly = False, select = 1, **attributes): - """ - currently we only support select = 1, - namely, we only pass the first file add to the collection to the tasks - """ - PypeDataObjectBase.__init__(self, URL, **attributes) - URLParseResult = urlparse(URL) - self.compositedDataObjName = URLParseResult.path - self.localFileName = None - self._path = None - self.readOnly = readOnly - self.verification = [] - self.localFiles = [] # a list of all files within the obj - self.select = select - - def addLocalFile(self, pLocalFile): - if not isinstance(pLocalFile, PypeLocalFile): - raise TypeMismatchError, "only PypeLocalFile object can be added into PypeLocalFileColletion" - self.localFiles.append(pLocalFile) - if self.select == 1: - self.localFileName = self.localFiles[0].localFileName - self._path = self.localFileName - - @property - def timeStamp(self): - if self.localFileName == None: - raise PypeError, "No PypeLocalFile is added into the PypeLocalFileColletion yet" - if not os.path.exists(self.localFileName): - raise FileNotExistError("No such file:%s on %s" % (self.localFileName, platform.node()) ) - return os.stat(self.localFileName).st_mtime - - @property - def exists(self): - if self.localFileName == None: - raise PypeError, "No PypeLocalFile is added into the PypeLocalFileColletion yet" - return os.path.exists(self.localFileName) - - -class PypeSplittableLocalFile(PypeDataObjectBase): - """ - Represent a PypeData object that has two different local file - (1) the whole file (could be a virtual one) - (2) the split files - - * Such data object can have either a scatter task attached or a gather task attached. - * If a scatter task is attached, the task will be inserted to generate the scattered files. - * If a gather task is attached, the task will be inserted to generate the whole file. - * If neither scatter task nor gather task is specified, then the file is mostly like interme - Namely, the whole file representation is not used any place else. - * One can not specify scatter task and gather task for the same object since it will create - """ - supportedURLScheme = ["splittablefile"] - - def __init__(self, URL, readOnly = False, nChunk = 1, **attributes): - PypeDataObjectBase.__init__(self, URL, **attributes) - self._updatePath() - self.readOnly = readOnly - self.verification = [] - self._scatterTask = None - self._gatherTask = None - self._splittedFiles = [] - self.nChunk = nChunk - - URLParseResult = urlparse(self.URL) - cfURL = "file://%s%s" % (URLParseResult.netloc, URLParseResult.path) - - self._completeFile = PypeLocalFile(cfURL, readOnly, **attributes) - - dirname, basename = os.path.split(self._path) - - for i in range(nChunk): - chunkBasename = "%03d_%s" % (i, basename) - if dirname != "": - chunkURL = "file://%s%s/%s" % (URLParseResult.netloc, dirname, chunkBasename) - else: - chunkURL = "file://%s/%s" % (URLParseResult.netloc, chunkBasename) - - subfile = PypeLocalFile(chunkURL, readOnly, **attributes) - self._splittedFiles.append(subfile) - - def setGatherTask(self, TaskCreator, TaskType, function): - assert self._scatterTask == None - inputDataObjs = dict( ( ("subfile%03d" % c[0], c[1]) - for c in enumerate(self._splittedFiles) ) ) - outputDataObjs = {"completeFile": self._completeFile} - gatherTask = TaskCreator( inputDataObjs = inputDataObjs, - outputDataObjs = outputDataObjs, - URL = "task://gather/%s" % self._path , - TaskType=TaskType) ( function ) - self._gatherTask = gatherTask - - def setScatterTask(self, TaskCreator, TaskType, function): - assert self._gatherTask == None - outputDataObjs = dict( ( ("subfile%03d" % c[0], c[1]) - for c in enumerate(self._splittedFiles) ) ) - inputDataObjs = {"completeFile": self._completeFile} - scatterTask = TaskCreator( inputDataObjs = inputDataObjs, - outputDataObjs = outputDataObjs, - URL = "task://scatter/%s" % self._path , - TaskType=TaskType) ( function ) - self._scatterTask = scatterTask - - def getGatherTask(self): - return self._gatherTask - - def getScatterTask(self): - return self._scatterTask - - def getSplittedFiles(self): - return self._splittedFiles - - @property - def timeStamp(self): - return self._completeFile.timeStamp - -def makePypeLocalFile(aLocalFileName, readOnly = False, scheme="file", **attributes): - """ - >>> f = makePypeLocalFile("/tmp/test.txt") - >>> f.localFileName == "/tmp/test.txt" - True - >>> fn(f) - '/tmp/test.txt' - """ - aLocalFileName = os.path.abspath(aLocalFileName) - - #if aLocalFileName.startswith("/"): - # aLocalFileName.lstrip("/") - - return PypeLocalFile("%s://localhost%s" % (scheme, aLocalFileName), readOnly, **attributes) - -def makePypeLocalStateFile(stateName, readOnly = False, **attributes): - dirname, basename = os.path.split(stateName) - stateFileName = os.path.join(dirname, "."+basename) - return makePypeLocalFile( stateFileName, readOnly = readOnly, scheme = "state", **attributes) - -if __name__ == "__main__": - import doctest - doctest.testmod() - diff --git a/pypeFLOW/pypeflow/pwatcher_bridge.py b/pypeFLOW/pypeflow/pwatcher_bridge.py deleted file mode 100644 index d8b78c8..0000000 --- a/pypeFLOW/pypeflow/pwatcher_bridge.py +++ /dev/null @@ -1,391 +0,0 @@ -"""Bridge pattern to adapt pypeFLOW with pwatcher. - -This is a bit messy, but it avoids re-writing the useful bits -of pypeFLOW. - -With PypeProcWatcherWorkflow, the refreshTargets() loop will -be single-threaded! -""" -from pypeflow.task import PypeTask, PypeThreadTaskBase, PypeTaskBase, TaskFunctionError -from pypeflow.data import fn -import pwatcher.blocking -import pwatcher.fs_based -import pwatcher.network_based -import pypeflow.controller -import pypeflow.task -import collections -import datetime -import glob -import hashlib -import json -import logging -import os -import pprint -import re -import sys -import time -import traceback - -log = logging.getLogger(__name__) - -def PypeProcWatcherWorkflow( - URL = None, - job_type='local', - job_queue='UNSPECIFIED_QUEUE', - watcher_type='fs_based', - watcher_directory='mypwatcher', - max_jobs=None, # ignore here for now - **attributes): - """Factory for the workflow using our new - filesystem process watcher. - """ - if watcher_type == 'blocking': - pwatcher_impl = pwatcher.blocking - elif watcher_type == 'network_based': - pwatcher_impl = pwatcher.network_based - else: - pwatcher_impl = pwatcher.fs_based - log.warning('In pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl)) - log.info('In pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl)) - watcher = pwatcher_impl.get_process_watcher(watcher_directory) - log.info('job_type={!r}, job_queue={!r}'.format(job_type, job_queue)) - th = MyPypeFakeThreadsHandler(watcher, job_type, job_queue) - mq = MyMessageQueue() - se = MyFakeShutdownEvent() # TODO: Save pwatcher state on ShutdownEvent. (Not needed for blocking pwatcher. Mildly useful for fs_based.) - return pypeflow.controller._PypeConcurrentWorkflow(URL=URL, thread_handler=th, messageQueue=mq, shutdown_event=se, - attributes=attributes) - -PypeProcWatcherWorkflow.setNumThreadAllowed = pypeflow.controller._PypeConcurrentWorkflow.setNumThreadAllowed - -class Fred(object): - """Fake thread. - """ - INITIALIZED = 10 - STARTED = 20 - RUNNING = 30 - JOINED = 40 - def is_alive(self): - return self.__status in (Fred.STARTED, Fred.RUNNING) - def start(self): - self.__status = Fred.STARTED - self.__th.enqueue(self) - def join(self, timeout=None): - # Maybe we should wait until the sentinel is visible in the filesystem? - # Also, if we were STARTED but not RUNNING, then we never did anything! - - #assert self.__status is not Fred.JOINED # Nope. Might be called a 2nd time by notifyTerminate(). - self.__status = Fred.JOINED - # And our own special methods. - def task(self): - return self.__target - def generate(self): - self.__target() - def setTargetStatus(self, status): - self.__target.setStatus(status) - def endrun(self, status): - """By convention for now, status is one of: - 'DEAD' - 'UNSUBMITTED' (a pseudo-status defined in the ready-loop of alive()) - 'EXIT rc' - """ - name = status.split()[0] - if name == 'DEAD': - log.warning(''.join(traceback.format_stack())) - log.error('Task {}\n is DEAD, meaning no HEARTBEAT, but this can be a race-condition. If it was not killed, then restarting might suffice. Otherwise, you might have excessive clock-skew.'.format(self.brief())) - self.setTargetStatus(pypeflow.task.TaskFail) # for lack of anything better - elif name == 'UNSUBMITTED': - log.warning(''.join(traceback.format_stack())) - log.error('Task {}\n is UNSUBMITTED, meaning job-submission somehow failed. Possibly a re-start would work. Otherwise, you need to investigate.'.format(self.brief())) - self.setTargetStatus(pypeflow.task.TaskFail) # for lack of anything better - elif name != 'EXIT': - raise Exception('Unexpected status {!r}'.format(name)) - else: - code = int(status.split()[1]) - if 0 == code: - self.__target.check_missing() - # TODO: If missing, just leave the status as TaskInitialized? - else: - log.error('Task {} failed with exit-code={}'.format(self.brief(), code)) - self.setTargetStatus(pypeflow.task.TaskFail) # for lack of anything better - self.__target.finish() - def brief(self): - return 'Fred{}'.format(self.__target.brief()) - def __repr__(self): - return 'FRED with taskObj={!r}'.format(self.__target) - def __init__(self, target, th): - assert isinstance(target, MyFakePypeThreadTaskBase) - self.__target = target # taskObj? - self.__th = th # thread handler - self.__status = Fred.INITIALIZED - -class MyMessageQueue(object): - def empty(self): - return not self.__msgq - def get(self): - return self.__msgq.popleft() - def put(self, msg): - self.__msgq.append(msg) - def __init__(self): - self.__msgq = collections.deque() - -class MyFakeShutdownEvent(object): - """I do not see any code which actually uses the - shutdown_event, but if needed someday, we can use this. - """ - def set(self): - pass - -_prev_q = {} # To avoid excessive log output. - -class MyPypeFakeThreadsHandler(object): - """Stateless method delegator, for injection. - """ - def create(self, target): - thread = Fred(target=target, th=self) - return thread - def alive(self, threads): - ready = dict() - while self.__jobq: - fred = self.__jobq.popleft() - taskObj = fred.task() - fred.generate() # -> taskObj->generated_script_fn by convention - #log.info('param:\n%s' %pprint.pformat(taskObj.parameters)) # I do not think these change. - try: - script_fn = taskObj.generated_script_fn # BY CONVENTION - except AttributeError: - log.warning('Missing taskObj.generated_script_fn for task. Maybe we did not need it? Skipping and continuing.') - fred.endrun('EXIT 0') - continue - log.info('script_fn:%s' %repr(script_fn)) - content = open(script_fn).read() - digest = hashlib.sha256(content).hexdigest() - jobid = 'J{}'.format(digest) - log.info('jobid=%s' %jobid) - taskObj.jobid = jobid - ready[jobid] = fred - self.__known[jobid] = fred - if ready: - # Start anything in the 'ready' queue. - # Note: It is safe to run this block always, but we save a - # call to pwatcher with 'if ready'. - log.debug('ready dict keys:\n%s' %pprint.pformat(ready.keys())) - jobids = dict() - #sge_option='-pe smp 8 -q default' - for jobid, fred in ready.iteritems(): - generated_script_fn = fred.task().generated_script_fn - rundir, basename = os.path.split(os.path.abspath(generated_script_fn)) - cmd = '/bin/bash {}'.format(basename) - sge_option = fred.task().parameters.get('sge_option', None) - job_type = fred.task().parameters.get('job_type', None) - job_queue = fred.task().parameters.get('job_queue', None) - job_nprocs = fred.task().parameters.get('job_nprocs', None) - jobids[jobid] = { - 'cmd': cmd, - 'rundir': rundir, - # These are optional: - 'job_type': job_type, - 'job_queue': job_queue, - 'job_nprocs': job_nprocs, - 'sge_option': sge_option, - } - # Also send the default type and queue-name. - watcher_args = { - 'jobids': jobids, - 'job_type': self.__job_type, - 'job_queue': self.__job_queue, - } - result = self.watcher.run(**watcher_args) - log.debug('Result of watcher.run()={}'.format(repr(result))) - submitted = result['submitted'] - self.__running.update(submitted) - #log.info("QQQ ADDED: {}".format(jobid)) - for jobid in set(jobids.keys()) - set(submitted): - fred = ready[jobid] - fred.endrun('UNSUBMITTED') - - watcher_args = { - 'jobids': list(self.__running), - 'which': 'list', - } - q = self.watcher.query(**watcher_args) - #log.debug('In alive(), result of query:%s' %repr(q)) - global _prev_q - if q != _prev_q: - #log.debug('In alive(), updated result of query:%s' %repr(q)) - _prev_q = q - _prev_q = None - for jobid, status in q['jobids'].iteritems(): - if status.startswith('EXIT') or status.startswith('DEAD'): - self.__running.remove(jobid) - #log.info("QQQ REMOVED: {}".format(jobid)) - fred = self.__known[jobid] - try: - fred.endrun(status) - except Exception as e: - msg = 'Failed to clean-up FakeThread: jobid={} status={}'.format(jobid, repr(status)) - log.exception(msg) - raise - #log.info('len(jobq)==%d' %len(self.__jobq)) - #log.info(''.join(traceback.format_stack())) - return sum(thread.is_alive() for thread in threads) - def join(self, threads, timeout): - then = datetime.datetime.now() - for thread in threads: - #assert thread is not threading.current_thread() - if thread.is_alive(): - to = max(0, timeout - (datetime.datetime.now() - then).seconds) - # This is called only in the refreshTargets() catch, so - # it can simply terminate all threads. - thread.join(to) - self.notifyTerminate(threads) - def notifyTerminate(self, threads): - """Assume these are daemon threads. - We will attempt to join them all quickly, but non-daemon threads may - eventually block the program from quitting. - """ - pass #self.join(threads, 1) - # TODO: Terminate only the jobs for 'threads'. - # For now, use 'known' instead of 'infer' b/c we - # also want to kill merely queued jobs, though that is currently difficult. - watcher_args = { - 'jobids': list(self.__running), - 'which': 'known', - } - q = self.watcher.delete(**watcher_args) - log.debug('In notifyTerminate(), result of delete:%s' %repr(q)) - - - # And our special methods. - def enqueue(self, fred): - self.__jobq.append(fred) - def __init__(self, watcher, job_type, job_queue=None): - """ - job_type and job_queue are defaults, possibly over-ridden for specific jobs. - Note: job_queue is a string, not a collection. If None, then it would need to - come via per-job settings. - """ - self.watcher = watcher - self.__job_type = job_type - self.__job_queue = job_queue - self.__jobq = collections.deque() - self.__running = set() - self.__known = dict() - -def makedirs(path): - if not os.path.isdir(path): - log.debug('makedirs {!r}'.format(path)) - os.makedirs(path) - -class MyFakePypeThreadTaskBase(PypeThreadTaskBase): - """Fake for PypeConcurrentWorkflow. - It demands a subclass, even though we do not use threads at all. - Here, we override everything that it overrides. PypeTaskBase defaults are fine. - """ - @property - def nSlots(self): - """(I am not sure what happend if > 1, but we will not need that. ~cdunn) - Return the required number of slots to run, total number of slots is determined by - PypeThreadWorkflow.MAX_NUMBER_TASK_SLOT, increase this number by passing desired number - through the "parameters" argument (e.g parameters={"nSlots":2}) to avoid high computationa - intensive job running concurrently in local machine One can set the max number of thread - of a workflow by PypeThreadWorkflow.setNumThreadAllowed() - """ - try: - nSlots = self.parameters["nSlots"] - except AttributeError: - nSlots = 1 - except KeyError: - nSlots = 1 - return nSlots - - def setMessageQueue(self, q): - self._queue = q - - def setShutdownEvent(self, e): - self.shutdown_event = e - - def __call__(self, *argv, **kwargv): - """Trap all exceptions, set fail flag, SEND MESSAGE, log, and re-raise. - """ - try: - return self.runInThisThread(*argv, **kwargv) - except: # and re-raise - #log.exception('%s __call__ failed:\n%r' %(type(self).__name__, self)) - self._status = pypeflow.task.TaskFail # TODO: Do not touch internals of base class. - self._queue.put( (self.URL, pypeflow.task.TaskFail) ) - raise - - def runInThisThread(self, *argv, **kwargv): - """ - Similar to the PypeTaskBase.run(), but it provide some machinary to pass information - back to the main thread that run this task in a sepearated thread through the standard python - queue from the Queue module. - - We expect this to be used *only* for tasks which generate run-scripts. - Our version does not actually run the script. Instead, we expect the script-filename to be returned - by run(). - """ - if self._queue == None: - raise Exception('There seems to be a case when self.queue==None, so we need to let this block simply return.') - self._queue.put( (self.URL, "started, runflag: %d" % True) ) - return self.run(*argv, **kwargv) - - # We must override this from PypeTaskBase b/c we do *not* produce outputs - # immediately. - def run(self, *argv, **kwargv): - argv = list(argv) - argv.extend(self._argv) - kwargv.update(self._kwargv) - - inputDataObjs = self.inputDataObjs - self.syncDirectories([o.localFileName for o in inputDataObjs.values()]) - - outputDataObjs = self.outputDataObjs - for datao in outputDataObjs.values(): - makedirs(os.path.dirname(fn(datao))) - parameters = self.parameters - - log.info('Running task from function %s()' %(self._taskFun.__name__)) - rtn = self._runTask(self, *argv, **kwargv) - - if self.inputDataObjs != inputDataObjs or self.parameters != parameters: - raise TaskFunctionError("The 'inputDataObjs' and 'parameters' should not be modified in %s" % self.URL) - # Jason, note that this only tests whether self.parameters was rebound. - # If it is altered, then so is parameters, so the check would pass. - # TODO(CD): What is the importance of this test? Should it be fixed or deleted? - - return True # to indicate that it run, since we no longer rely on runFlag - - def check_missing(self): - timeout_s = 30 - sleep_s = .1 - self.syncDirectories([o.localFileName for o in self.outputDataObjs.values()]) # Sometimes helps in NFS. - lastUpdate = datetime.datetime.now() - while timeout_s > (datetime.datetime.now()-lastUpdate).seconds: - missing = [(k,o) for (k,o) in self.outputDataObjs.iteritems() if not o.exists] - if missing: - log.debug("%s failed to generate all outputs; %s; missing:\n%s" %( - self.URL, repr(self._status), - pprint.pformat(missing), - )) - #import commands - #cmd = 'pstree -pg -u cdunn' - #output = commands.getoutput(cmd) - #log.debug('`%s`:\n%s' %(cmd, output)) - dirs = set(os.path.dirname(o.localFileName) for o in self.outputDataObjs.itervalues()) - for d in dirs: - log.debug('listdir(%s): %s' %(d, repr(os.listdir(d)))) - #self._status = pypeflow.task.TaskFail - time.sleep(sleep_s) - sleep_s *= 2.0 - else: - self._status = pypeflow.task.TaskDone - break - else: - log.info('timeout(%ss) in check_missing()' %timeout_s) - self._status = pypeflow.task.TaskFail - - # And our own special methods. - def finish(self): - self.syncDirectories([o.localFileName for o in self.outputDataObjs.values()]) - self._queue.put( (self.URL, self._status) ) diff --git a/pypeFLOW/pypeflow/pwatcher_workflow.py b/pypeFLOW/pypeflow/pwatcher_workflow.py new file mode 100644 index 0000000..0ca0384 --- /dev/null +++ b/pypeFLOW/pypeflow/pwatcher_workflow.py @@ -0,0 +1,9 @@ +from .simple_pwatcher_bridge import ( + PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase, + makePypeLocalFile, fn, PypeTask) +PypeThreadTaskBase = MyFakePypeThreadTaskBase + +__all__ = [ + 'PypeProcWatcherWorkflow', 'PypeThreadTaskBase', + 'makePypeLocalFile', 'fn', 'PypeTask', +] diff --git a/pypeFLOW/pypeflow/simple_pwatcher_bridge.py b/pypeFLOW/pypeflow/simple_pwatcher_bridge.py index d1c7467..3b2fe59 100644 --- a/pypeFLOW/pypeflow/simple_pwatcher_bridge.py +++ b/pypeFLOW/pypeflow/simple_pwatcher_bridge.py @@ -77,6 +77,9 @@ class PwatcherTaskQueue(object): rundir, basename = os.path.split(os.path.abspath(generated_script_fn)) cmd = '/bin/bash {}'.format(basename) + LOG.debug('In rundir={!r}, sge_option={!r}, __sge_option={!r}'.format( + rundir, + node.pypetask.parameters.get('sge_option'), self.__sge_option)) sge_option = node.pypetask.parameters.get('sge_option', self.__sge_option) job_type = node.pypetask.parameters.get('job_type', None) job_queue = node.pypetask.parameters.get('job_queue', None) @@ -241,13 +244,13 @@ class Workflow(object): len(unsubmitted), len(to_submit), unsubmitted)) #ready.update(unsubmitted) # Resubmit only in pwatcher, if at all. submitted.update(to_submit - unsubmitted) - LOG.debug('N in queue: {}'.format(len(submitted))) + LOG.debug('N in queue: {} (max_jobs={})'.format(len(submitted), self.max_jobs)) recently_done = set(self.tq.check_done()) if not recently_done: if not submitted: LOG.warning('Nothing is happening, and we had {} failures. Should we quit? Instead, we will just sleep.'.format(failures)) #break - LOG.info('sleep {}'.format(sleep_time)) + LOG.info('sleep {}s'.format(sleep_time)) time.sleep(sleep_time) sleep_time = sleep_time + 0.1 if (sleep_time < updateFreq) else updateFreq continue @@ -430,8 +433,8 @@ def find_work_dir(paths): """ dirnames = set(os.path.dirname(os.path.normpath(p)) for p in paths) if len(dirnames) != 1: - raise Exception('Cannot find work-dir for paths in multiple (or zero) dirs: {!r}'.format( - paths)) + raise Exception('Cannot find work-dir for paths in multiple (or zero) dirs: {} in {}'.format( + pprint.pformat(paths), pprint.pformat(dirnames))) d = dirnames.pop() return os.path.abspath(d) class PypeLocalFile(object): @@ -554,13 +557,12 @@ def PypeProcWatcherWorkflow( LOG.warning('In simple_pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl)) LOG.info('In simple_pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl)) watcher = pwatcher_impl.get_process_watcher(watcher_directory) - LOG.info('job_type={!r}, job_queue={!r}'.format(job_type, job_queue)) + LOG.info('job_type={!r}, job_queue={!r}, sge_option={!r}'.format(job_type, job_queue, sge_option)) return Workflow(watcher, job_type=job_type, job_queue=job_queue, max_jobs=max_jobs, sge_option=sge_option) #th = MyPypeFakeThreadsHandler('mypwatcher', job_type, job_queue) #mq = MyMessageQueue() #se = MyFakeShutdownEvent() # TODO: Save pwatcher state on ShutdownEvent. (Not needed for blocking pwatcher. Mildly useful for fs_based.) #return pypeflow.controller._PypeConcurrentWorkflow(URL=URL, thread_handler=th, messageQueue=mq, shutdown_event=se, # attributes=attributes) -PypeProcWatcherWorkflow.setNumThreadAllowed = lambda x, y: None __all__ = ['PypeProcWatcherWorkflow', 'fn', 'makePypeLocalFile', 'MyFakePypeThreadTaskBase', 'PypeTask'] diff --git a/pypeFLOW/pypeflow/task.py b/pypeFLOW/pypeflow/task.py deleted file mode 100644 index 32564ab..0000000 --- a/pypeFLOW/pypeflow/task.py +++ /dev/null @@ -1,892 +0,0 @@ - -# @author Jason Chin -# -# Copyright (C) 2010 by Jason Chin -# Copyright (C) 2011 by Jason Chin, Pacific Biosciences -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -""" - -PypeTask: This module provides the PypeTask class and the decorators that can convert -a regular python funtion into a PypeTask instance. - -""" - -import pprint -import inspect -import hashlib -import logging -import copy -import sys -import time - -PYTHONVERSION = sys.version_info[:2] -if PYTHONVERSION == (2,5): - import simplejson as json -else: - import json - -import os -import shlex - -from common import PypeError, PypeObject, pypeNS, runShellCmd, Graph, URIRef, Literal -from data import FileNotExistError, PypeSplittableLocalFile, makePypeLocalFile - -logger = logging.getLogger(__name__) - -class TaskFunctionError(PypeError): - pass - -# These must be strings. -TaskInitialized = "TaskInitialized" -TaskDone = "done" -TaskFail = "fail" -# TODO(CD): Make user-code compare by variable name. - -class PypeTaskBase(PypeObject): - """ - Represent a PypeTask. Subclass it to for different kind of - task. - """ - - supportedURLScheme = ["task"] - - def __init__(self, URL, *argv, **kwargv): - - """ - Constructor of a PypeTask. - """ - - PypeObject.__init__(self, URL, **kwargv) - - self._argv = argv - self._kwargv = kwargv - self._taskFun = kwargv['_taskFun'] - self._referenceMD5 = None - self._status = TaskInitialized - self._queue = None - self.shutdown_event = None - - - for defaultAttr in ["inputDataObjs", "outputDataObjs", "parameters", "mutableDataObjs"]: - if defaultAttr not in self.__dict__: - self.__dict__[defaultAttr] = {} - - # "input" and "output" short cut - if "inputs" in kwargv: - self.inputDataObjs.update(kwargv["inputs"]) - del kwargv["inputs"] - - if "outputs" in kwargv: - self.outputDataObjs.update(kwargv["outputs"]) - del kwargv["outputs"] - - if "mutables" in kwargv: - self.mutableDataObjs.update(kwargv["mutables"]) - del kwargv["mutables"] - - #the keys in inputDataObjs/outputDataObjs/parameters will become a task attribute - for defaultAttr in ["inputDataObjs", "outputDataObjs", "mutableDataObjs", "parameters"]: - vars(self).update(self.__dict__[defaultAttr]) - - if "chunk_id" in kwargv: - self.chunk_id = kwargv["chunk_id"] - - self._codeMD5digest = kwargv.get("_codeMD5digest", "") - self._paramMD5digest = kwargv.get("_paramMD5digest", "") - self._compareFunctions = kwargv.get("_compareFunctions", [ timeStampCompare ]) - - for o in self.outputDataObjs.values(): - if o.readOnly == True: - raise PypeError, "Cannot assign read only data object %s for task %s" % (o.URL, self.URL) - - @property - def status(self): - return self._status - - def setInputs( self, inputDataObjs ): - self.inputDataObjs = inputDataObjs - vars(self).update( inputDataObjs ) - - def setOutputs( self, outputDataObjs ): - self.outputDataObjs = outputDataObjs - vars(self).update( outputDataObjs ) - - def setReferenceMD5(self, md5Str): - self._referenceMD5 = md5Str - - def _getRunFlag(self): - """Determine whether the PypeTask should be run. It can be overridden in - subclass to allow more flexible rules. - """ - runFlag = False - if self._referenceMD5 is not None and self._referenceMD5 != self._codeMD5digest: - self._referenceMD5 = self._codeMD5digest - # Code has changed. - return True - return any( [ f(self.inputDataObjs, self.outputDataObjs, self.parameters) for f in self._compareFunctions] ) - - def isSatisfied(self): - """Compare dependencies. (Kinda expensive.) - Note: Do not call this while the task is actually running! - """ - return not self._getRunFlag() - - def getStatus(self): - """ - Note: Do not call this while the task is actually running! - """ - return self._status - - def setStatus(self, status): - """ - Note: Do not call this while the task is actually running! - """ - assert status in (TaskInitialized, TaskDone, TaskFail) - self._status = status - - def _runTask(self, *argv, **kwargv): - """ - The method to run the decorated function _taskFun(). It is called through run() of - the PypeTask object and it should never be called directly - - TODO: the arg porcessing is still a mess, need to find a better way to do this - """ - if PYTHONVERSION == (2,5): #TODO(CD): Does this even work anymore? - (args, varargs, varkw, defaults) = inspect.getargspec(self._taskFun) - #print (args, varargs, varkw, defaults) - else: - argspec = inspect.getargspec(self._taskFun) - (args, varargs, varkw, defaults) = argspec.args, argspec.varargs, argspec.keywords, argspec.defaults - - if varkw != None: - return self._taskFun(self, *argv, **kwargv) - elif varargs != None: - return self._taskFun(self, *argv) - elif len(args) != 0: - nkwarg = {} - if defaults != None: - defaultArg = args[-len(defaults):] - for a in defaultArg: - nkwarg[a] = kwargv[a] - return self._taskFun(self, *argv, **nkwarg) - else: - return self._taskFun(self) - else: - return self._taskFun(self) - - @property - def _RDFGraph(self): - graph = Graph() - for k,v in self.__dict__.iteritems(): - if k == "URL": continue - if k[0] == "_": continue - if k in ["inputDataObjs", "outputDataObjs", "mutableDataObjs", "parameters"]: - if k == "inputDataObjs": - for ft, f in v.iteritems(): - graph.add( (URIRef(self.URL), pypeNS["prereq"], URIRef(f.URL) ) ) - elif k == "outputDataObjs": - for ft, f in v.iteritems(): - graph.add( (URIRef(f.URL), pypeNS["prereq"], URIRef(self.URL) ) ) - elif k == "mutableDataObjs": - for ft, f in v.iteritems(): - graph.add( (URIRef(self.URL), pypeNS["hasMutable"], URIRef(f.URL) ) ) - elif k == "parameters": - graph.add( (URIRef(self.URL), pypeNS["hasParameters"], Literal(json.dumps(v)) ) ) - - continue - - if k in self.inputDataObjs: - graph.add( ( URIRef(self.URL), pypeNS["inputDataObject"], URIRef(v.URL) ) ) - continue - - if k in self.outputDataObjs: - graph.add( ( URIRef(self.URL), pypeNS["outputDataObject"], URIRef(v.URL) ) ) - continue - - if k in self.mutableDataObjs: - graph.add( ( URIRef(self.URL), pypeNS["mutableDataObject"], URIRef(v.URL) ) ) - continue - - if hasattr(v, "URL"): - graph.add( ( URIRef(self.URL), pypeNS[k], URIRef(v.URL) ) ) - - graph.add( ( URIRef(self.URL), pypeNS["codeMD5digest"], Literal(self._codeMD5digest) ) ) - graph.add( ( URIRef(self.URL), pypeNS["parameterMD5digest"], Literal(self._paramMD5digest) ) ) - - return graph - - def __call__(self, *argv, **kwargv): - """Trap all exceptions, set fail flag, log, and re-raise. - If you need to do more, then over-ride this method. - """ - try: - return self.run(*argv, **kwargv) - except: # and re-raise - logger.exception('PypeTaskBase failed unexpectedly:\n%r' %self) - self._status = TaskFail - raise - - @staticmethod - def syncDirectories(fns): - # need the following loop to force the stupid Islon to update the metadata in the directory - # otherwise, the file would be appearing as non-existence... sigh, this is a >5 hours hard earned hacks - # Yes, a friend at AMD had this problem too. Painful. ~cd - for d in set(os.path.dirname(fn) for fn in fns): - try: - os.listdir(d) - except OSError: - pass - - def run(self, *argv, **kwargv): - """Determine whether a task should be run when called. - If the dependency is not satisified, - then the _taskFun() will be called to generate the output data objects. - - Derived class can over-ride this method, but if __call__ is over-ridden, - then derived must call this explicitly. - """ - argv = list(argv) - argv.extend(self._argv) - kwargv.update(self._kwargv) - - inputDataObjs = self.inputDataObjs - self.syncDirectories([o.localFileName for o in inputDataObjs.values()]) - - outputDataObjs = self.outputDataObjs - parameters = self.parameters - - logger.info('Running task from function %s()' %(self._taskFun.__name__)) - rtn = self._runTask(self, *argv, **kwargv) - - if self.inputDataObjs != inputDataObjs or self.parameters != parameters: - raise TaskFunctionError("The 'inputDataObjs' and 'parameters' should not be modified in %s" % self.URL) - missing = [(k,o) for (k,o) in self.outputDataObjs.iteritems() if not o.exists] - if missing: - logger.debug("%s fails to generate all outputs; missing:\n%s" %(self.URL, pprint.pformat(missing))) - self._status = TaskFail - else: - self._status = TaskDone - - return True # to indicate that it run, since we no longer rely on runFlag - - def __repr__(self): - r = dict() - r['_status'] = self._status - r['inputDataObjs'] = self.inputDataObjs - r['outputDataObjs'] = self.outputDataObjs - r['mutableDataObjs'] = self.mutableDataObjs - r['parameters'] = self.parameters - r['URL'] = getattr(self, 'URL', 'No URL?') - r['__class__.__name__'] = self.__class__.__name__ - return pprint.pformat(r) - def brief(self): - r = dict() - r['URL'] = self.URL - return pprint.pformat(r) - - def finalize(self): - """ - This method is intended to be overriden by subclass to provide extra processing that is not - directed related to the processing the input and output data. For the thread workflow, this - method will be called in the main thread after a take is finished regardless the job status. - """ - pass - -class PypeThreadTaskBase(PypeTaskBase): - - """ - Represent a PypeTask that can be run within a thread. - Subclass it to for different kind of task. - """ - - @property - def nSlots(self): - """ - Return the required number of slots to run, total number of slots is determined by - PypeThreadWorkflow.MAX_NUMBER_TASK_SLOT, increase this number by passing desired number - through the "parameters" argument (e.g parameters={"nSlots":2}) to avoid high computationa - intensive job running concurrently in local machine One can set the max number of thread - of a workflow by PypeThreadWorkflow.setNumThreadAllowed() - """ - try: - nSlots = self.parameters["nSlots"] - except AttributeError: - nSlots = 1 - except KeyError: - nSlots = 1 - return nSlots - - - def setMessageQueue(self, q): - self._queue = q - - def setShutdownEvent(self, e): - self.shutdown_event = e - - def __call__(self, *argv, **kwargv): - """Trap all exceptions, set fail flag, SEND MESSAGE, log, and re-raise. - """ - try: - return self.runInThisThread(*argv, **kwargv) - except: # and re-raise - logger.exception('PypeTaskBase failed:\n%r' %self) - self._status = TaskFail # TODO: Do not touch internals of base class. - self._queue.put( (self.URL, TaskFail) ) - raise - - def runInThisThread(self, *argv, **kwargv): - """ - Similar to the PypeTaskBase.run(), but it provide some machinary to pass information - back to the main thread that run this task in a sepearated thread through the standard python - queue from the Queue module. - """ - if self._queue == None: - logger.debug('Testing threads w/out queue?') - self.run(*argv, **kwargv) - # return - # raise until we know what this should do. - raise Exception('There seems to be a case when self.queue==None, so we need to let this block simply return.') - - self._queue.put( (self.URL, "started, runflag: %d" % True) ) - self.run(*argv, **kwargv) - - self.syncDirectories([o.localFileName for o in self.outputDataObjs.values()]) - - self._queue.put( (self.URL, self._status) ) - -class PypeDistributiableTaskBase(PypeThreadTaskBase): - - """ - Represent a PypeTask that can be run within a thread or submit to - a grid-engine like job scheduling system. - Subclass it to for different kind of task. - """ - - def __init__(self, URL, *argv, **kwargv): - PypeTaskBase.__init__(self, URL, *argv, **kwargv) - self.distributed = True - - -class PypeTaskCollection(PypeObject): - - """ - Represent an object that encapsules a number of tasks - """ - - supportedURLScheme = ["tasks"] - def __init__(self, URL, tasks = [], scatterGatherTasks = [], **kwargv): - PypeObject.__init__(self, URL, **kwargv) - self._tasks = tasks[:] - self._scatterGatherTasks = scatterGatherTasks[:] - - def addTask(self, task): - self._tasks.append(task) - - def getTasks(self): - return self._tasks - - def addScatterGatherTask(self, task): - self._scatterGatherTasks.append(task) - - def getScatterGatherTasks(self): - return self._scatterGatherTasks - - def __getitem__(self, k): - return self._tasks[k] - -_auto_names = set() -def _unique_name(name): - """ - >>> def foo(): pass - >>> _unique_name('foo') - 'foo' - >>> _unique_name('foo') - 'foo.01' - >>> _unique_name('foo') - 'foo.02' - """ - if name in _auto_names: - n = 0 - while True: - n += 1 - try_name = '%s.%02d' %(name, n) - if try_name not in _auto_names: - break - name = try_name - _auto_names.add(name) - return name -def _auto_task_url(taskFun): - # Note: in doctest, the filename would be weird. - return "task://" + inspect.getfile(taskFun) + "/"+ _unique_name(taskFun.func_name) - -def PypeTask(**kwargv): - - """ - A decorator that converts a function into a PypeTaskBase object. - - >>> import os, time - >>> from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn - >>> from pypeflow.task import * - >>> try: - ... os.makedirs("/tmp/pypetest") - ... _ = os.system("rm -f /tmp/pypetest/*") - ... except Exception: - ... pass - >>> time.sleep(.1) - >>> fin = makePypeLocalFile("/tmp/pypetest/testfile_in", readOnly=False) - >>> fout = makePypeLocalFile("/tmp/pypetest/testfile_out", readOnly=False) - >>> @PypeTask(outputs={"test_out":fout}, - ... inputs={"test_in":fin}, - ... parameters={"a":'I am "a"'}, **{"b":'I am "b"'}) - ... def test(self): - ... print test.test_in.localFileName - ... print test.test_out.localFileName - ... os.system( "touch %s" % fn(test.test_out) ) - ... print self.test_in.localFileName - ... print self.test_out.localFileName - ... pass - >>> type(test) - <class 'pypeflow.task.PypeTaskBase'> - >>> test.test_in.localFileName - '/tmp/pypetest/testfile_in' - >>> test.test_out.localFileName - '/tmp/pypetest/testfile_out' - >>> os.system( "touch %s" % ( fn(fin)) ) - 0 - >>> timeStampCompare(test.inputDataObjs, test.outputDataObjs, test.parameters) - True - >>> print test._getRunFlag() - True - >>> test() - /tmp/pypetest/testfile_in - /tmp/pypetest/testfile_out - /tmp/pypetest/testfile_in - /tmp/pypetest/testfile_out - True - >>> timeStampCompare(test.inputDataObjs, test.outputDataObjs, test.parameters) - False - >>> print test._getRunFlag() - False - >>> print test.a - I am "a" - >>> print test.b - I am "b" - >>> os.system( "touch %s" % (fn(fin)) ) - 0 - >>> # test PypeTask.finalize() - >>> from controller import PypeWorkflow - >>> wf = PypeWorkflow() - >>> wf.addTask(test) - >>> def finalize(self): - ... def f(): - ... print "in finalize:", self._status - ... return f - >>> test.finalize = finalize(test) # For testing only. Please don't do this in your code. The PypeTask.finalized() is intended to be overriden by subclasses. - >>> wf.refreshTargets( objs = [fout] ) - /tmp/pypetest/testfile_in - /tmp/pypetest/testfile_out - /tmp/pypetest/testfile_in - /tmp/pypetest/testfile_out - in finalize: done - True - >>> #The following code show how to set up a task with a PypeThreadWorkflow that allows running multitple tasks in parallel. - >>> from pypeflow.controller import PypeThreadWorkflow - >>> wf = PypeThreadWorkflow() - >>> @PypeTask(outputDataObjs={"test_out":fout}, - ... inputDataObjs={"test_in":fin}, - ... TaskType=PypeThreadTaskBase, - ... parameters={"a":'I am "a"'}, **{"b":'I am "b"'}) - ... def test(self): - ... print test.test_in.localFileName - ... print test.test_out.localFileName - ... os.system( "touch %s" % fn(test.test_out) ) - ... print self.test_in.localFileName - ... print self.test_out.localFileName - >>> wf.addTask(test) - >>> def finalize(self): - ... def f(): - ... print "in finalize:", self._status - ... return f - >>> test.finalize = finalize(test) # For testing only. Please don't do this in your code. The PypeTask.finalized() is intended to be overided by subclasses. - >>> wf.refreshTargets( objs = [fout] ) #doctest: +SKIP - """ - - def f(taskFun): - - TaskType = kwargv.get("TaskType", PypeTaskBase) - - if "TaskType" in kwargv: - del kwargv["TaskType"] - - kwargv["_taskFun"] = taskFun - - if kwargv.get("URL",None) == None: - kwargv["URL"] = _auto_task_url(taskFun) - try: - kwargv["_codeMD5digest"] = hashlib.md5(inspect.getsource(taskFun)).hexdigest() - except IOError: #python2.7 seems having problem to get source code from docstring, this is a work around to make docstring test working - kwargv["_codeMD5digest"] = "" - kwargv["_paramMD5digest"] = hashlib.md5(repr(kwargv)).hexdigest() - - newKwargv = copy.copy(kwargv) - inputDataObjs = kwargv.get("inputDataObjs",{}) - inputDataObjs.update(kwargv.get("inputs", {})) - outputDataObjs = kwargv.get("outputDataObjs",{}) - outputDataObjs.update(kwargv.get("outputs", {})) - newInputs = {} - for inputKey, inputDO in inputDataObjs.items(): - if isinstance(inputDO, PypeSplittableLocalFile): - newInputs[inputKey] = inputDO._completeFile - else: - newInputs[inputKey] = inputDO - - newOutputs = {} - for outputKey, outputDO in outputDataObjs.items(): - if isinstance(outputDO, PypeSplittableLocalFile): - newOutputs[outputKey] = outputDO._completeFile - else: - newOutputs[outputKey] = outputDO - - newKwargv["inputDataObjs"] = newInputs - newKwargv["outputDataObjs"] = newOutputs - task = TaskType(**newKwargv) - task.__doc__ = taskFun.__doc__ - return task - - return f - -def PypeShellTask(**kwargv): - - """ - A function that converts a shell script into a PypeTaskBase object. - - >>> import os, time - >>> from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn - >>> from pypeflow.task import * - >>> try: - ... os.makedirs("/tmp/pypetest") - ... _ = os.system("rm -f /tmp/pypetest/*") - ... except Exception: - ... pass - >>> time.sleep(.1) - >>> fin = makePypeLocalFile("/tmp/pypetest/testfile_in", readOnly=False) - >>> fout = makePypeLocalFile("/tmp/pypetest/testfile_out", readOnly=False) - >>> f = open("/tmp/pypetest/shellTask.sh","w") - >>> f.write( "touch %s" % (fn(fout))) - >>> f.close() - >>> shellTask = PypeShellTask(outputDataObjs={"test_out":fout}, - ... inputDataObjs={"test_in":fin}, - ... parameters={"a":'I am "a"'}, **{"b":'I am "b"'}) - >>> shellTask = shellTask("/tmp/pypetest/shellTask.sh") - >>> type(shellTask) - <class 'pypeflow.task.PypeTaskBase'> - >>> print fn(shellTask.test_in) - /tmp/pypetest/testfile_in - >>> os.system( "touch %s" % fn(fin) ) - 0 - >>> timeStampCompare(shellTask.inputDataObjs, shellTask.outputDataObjs, shellTask.parameters) - True - >>> print shellTask._getRunFlag() - True - >>> shellTask() # run task - True - >>> timeStampCompare(shellTask.inputDataObjs, shellTask.outputDataObjs, shellTask.parameters) - False - >>> print shellTask._getRunFlag() - False - >>> shellTask() - True - """ - - def f(scriptToRun): - def taskFun(self): - """make shell script using a template""" - """run shell command""" - shellCmd = "/bin/bash %s" % scriptToRun - runShellCmd(shlex.split(shellCmd)) - - kwargv["script"] = scriptToRun - return PypeTask(**kwargv)(taskFun) - - return f - - -def PypeSGETask(**kwargv): - - """ - Similar to PypeShellTask, but the shell script job will be executed through SGE. - """ - - def f(scriptToRun): - - def taskFun(): - """make shell script using the template""" - """run shell command""" - shellCmd = "qsub -sync y -S /bin/bash %s" % scriptToRun - runShellCmd(shlex.split(shellCmd)) - - kwargv["script"] = scriptToRun - - return PypeTask(**kwargv)(taskFun) - - return f - -def PypeDistributibleTask(**kwargv): - - """ - Similar to PypeShellTask and PypeSGETask, with an additional argument "distributed" to decide - whether a job to be run through local shell or SGE. - """ - - distributed = kwargv.get("distributed", False) - def f(scriptToRun): - def taskFun(self): - """make shell script using the template""" - """run shell command""" - if distributed == True: - shellCmd = "qsub -sync y -S /bin/bash %s" % scriptToRun - else: - shellCmd = "/bin/bash %s" % scriptToRun - - runShellCmd(shlex.split(shellCmd)) - - kwargv["script"] = scriptToRun - return PypeTask(**kwargv)(taskFun) - - return f - - -def PypeScatteredTasks(**kwargv): - - def f(taskFun): - - TaskType = kwargv.get("TaskType", PypeTaskBase) - - if "TaskType" in kwargv: - del kwargv["TaskType"] - - kwargv["_taskFun"] = taskFun - - inputDataObjs = kwargv["inputDataObjs"] - outputDataObjs = kwargv["outputDataObjs"] - nChunk = None - scatteredInput = [] - - if kwargv.get("URL", None) == None: - kwargv["URL"] = "tasks://" + inspect.getfile(taskFun) + "/"+ taskFun.func_name - - tasks = PypeTaskCollection(kwargv["URL"]) - - for inputKey, inputDO in inputDataObjs.items(): - if hasattr(inputDO, "nChunk"): - if nChunk != None: - assert inputDO.nChunk == nChunk - else: - nChunk = inputDO.nChunk - if inputDO.getScatterTask() != None: - tasks.addScatterGatherTask( inputDO.getScatterTask() ) - - scatteredInput.append( inputKey ) - - for outputKey, outputDO in outputDataObjs.items(): - if hasattr(outputDO, "nChunk"): - if nChunk != None: - assert outputDO.nChunk == nChunk - if outputDO.getGatherTask() != None: - tasks.addScatterGatherTask( outputDO.getGatherTask() ) - else: - nChunk = outputDO.nChunk - - - for i in range(nChunk): - - newKwargv = copy.copy(kwargv) - - subTaskInput = {} - for inputKey, inputDO in inputDataObjs.items(): - if inputKey in scatteredInput: - subTaskInput[inputKey] = inputDO.getSplittedFiles()[i] - else: - subTaskInput[inputKey] = inputDO - - subTaskOutput = {} - for outputKey, outputDO in outputDataObjs.items(): - subTaskOutput[outputKey] = outputDO.getSplittedFiles()[i] - - newKwargv["inputDataObjs"] = subTaskInput - newKwargv["outputDataObjs"] = subTaskOutput - - #newKwargv["URL"] = "task://" + inspect.getfile(taskFun) + "/"+ taskFun.func_name + "/%03d" % i - newKwargv["URL"] = kwargv["URL"].replace("tasks","task") + "/%03d" % i - - try: - newKwargv["_codeMD5digest"] = hashlib.md5(inspect.getsource(taskFun)).hexdigest() - except IOError: - # python2.7 seems having problem to get source code from docstring, - # this is a work around to make docstring test working - newKwargv["_codeMD5digest"] = "" - - newKwargv["_paramMD5digest"] = hashlib.md5(repr(kwargv)).hexdigest() - newKwargv["chunk_id"] = i - - - tasks.addTask( TaskType(**newKwargv) ) - return tasks - return f - -getPypeScatteredTasks = PypeScatteredTasks - -def PypeFOFNMapTasks(**kwargv): - """ - A special decorator that takes a FOFN (file of file names) as the main - input and generate the tasks with the inputs are the files specified in - the FOFN - - Example: - - def outTemplate(fn): - return fn + ".out" - - def task(self, **kwargv): - in_f = self.in_f - out_f = self.out_f - #do something with in_f, and write something to out_f - - tasks = PypeFOFNMapTasks(FOFNFileName = "./file.fofn", - outTemplateFunc = outTemplate, - TaskType = PypeThreadTaskBase, - parameters = dict(nSlots = 8))( alignTask ) - """ - - def f(taskFun): - - TaskType = kwargv.get("TaskType", PypeTaskBase) - - if "TaskType" in kwargv: - del kwargv["TaskType"] - - kwargv["_taskFun"] = taskFun - - FOFNFileName = kwargv["FOFNFileName"] - outTemplateFunc = kwargv["outTemplateFunc"] - - if kwargv.get("URL", None) == None: - kwargv["URL"] = "tasks://" + inspect.getfile(taskFun) + "/"+ taskFun.func_name - - tasks = PypeTaskCollection(kwargv["URL"]) - - with open(FOFNFileName,"r") as FOFN: - - newKwargv = copy.copy(kwargv) - - for fn in FOFN: - - fn = fn.strip() - - if len(fn) == 0: - continue - - newKwargv["inputDataObjs"] = {"in_f": makePypeLocalFile(fn) } - outfileName = outTemplateFunc(fn) - newKwargv["outputDataObjs"] = {"out_f": makePypeLocalFile(outfileName) } - newKwargv["URL"] = kwargv["URL"].replace("tasks","task") + "/%s" % hashlib.md5(fn).hexdigest() - - try: - newKwargv["_codeMD5digest"] = hashlib.md5(inspect.getsource(taskFun)).hexdigest() - except IOError: - # python2.7 seems having problem to get source code from docstring, - # this is a work around to make docstring test working - newKwargv["_codeMD5digest"] = "" - - - newKwargv["_paramMD5digest"] = hashlib.md5(repr(kwargv)).hexdigest() - - tasks.addTask( TaskType(**newKwargv) ) - - allFOFNOutDataObjs = dict( [ ("FOFNout%03d" % t[0], t[1].in_f) for t in enumerate(tasks) ] ) - - def pseudoScatterTask(**kwargv): - pass - - newKwargv = dict( inputDataObjs = {"FOFNin": makePypeLocalFile(FOFNFileName)}, - outputDataObjs = allFOFNOutDataObjs, - _taskFun = pseudoScatterTask, - _compareFunctions = [lambda inObjs, outObj, params: False], #this task is never meant to be run - URL = "task://pseudoScatterTask/%s" % FOFNFileName) - - tasks.addTask( TaskType(**newKwargv) ) - - return tasks - - return f - -getFOFNMapTasks = PypeFOFNMapTasks - -def timeStampCompare( inputDataObjs, outputDataObjs, parameters) : - - """ - Given the inputDataObjs and the outputDataObjs, determine whether any - object in the inputDataObjs is created or modified later than any object - in outputDataObjects. - """ - - runFlag = False - - inputDataObjsTS = [] - for ft, f in inputDataObjs.iteritems(): - if not f.exists: - wait_s = 60 - logger.warning('input does not exist yet (in this filesystem): %r - waiting up to %ds' %(f, wait_s)) - for i in range(wait_s): - time.sleep(1) - if f.exists: - logger.warning('now exists: %r (after only %ds)' %(f, i)) - break - # At this point, if it still does not exist, the entire workflow will fail. - # What else can we do? The user's filesystem has too much latency. - inputDataObjsTS.append((f.timeStamp, 'A', f)) - - outputDataObjsTS = [] - for ft, f in outputDataObjs.iteritems(): - if not f.exists: - logger.debug('output does not exist yet: %r'%f) - runFlag = True - break - else: - # 'A' < 'B', so outputs are 'later' if timestamps match. - outputDataObjsTS.append((f.timeStamp, 'B', f)) - - if not outputDataObjs: - # 0 outputs => always run - runFlag = True - - if not runFlag and inputDataObjs: # 0 inputs would imply that existence of outputs is enough. - minOut = min(outputDataObjsTS) - maxIn = max(inputDataObjsTS) - if minOut < maxIn: - logger.debug('timestamp of output < input: %r < %r'%(minOut, maxIn)) - runFlag = True - - return runFlag - -if __name__ == "__main__": - import doctest - doctest.testmod() diff --git a/pypeFLOW/setup.py b/pypeFLOW/setup.py index df77bec..1397b35 100644 --- a/pypeFLOW/setup.py +++ b/pypeFLOW/setup.py @@ -2,22 +2,19 @@ from setuptools import setup, Extension, find_packages setup( name = 'pypeflow', - version='0.1.1', + version='1.0.0', author='J. Chin', author_email='[email protected]', license='LICENSE.txt', packages = [ 'pypeflow', - 'pwatcher', # a separate package for here for convenience, for now + 'pwatcher', # a separate package here for convenience, for now 'pwatcher.mains', ], package_dir = {'':'.'}, zip_safe = False, install_requires=[ - 'rdflib == 3.4.0', - 'rdfextras >= 0.1', - 'html5lib == 0.999999', - 'networkx >=1.7, <=1.10', + 'networkx >=1.7, <=1.11', ], entry_points = {'console_scripts': [ 'pwatcher-main=pwatcher.mains.pwatcher:main', -- Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/falcon.git _______________________________________________ debian-med-commit mailing list [email protected] http://lists.alioth.debian.org/cgi-bin/mailman/listinfo/debian-med-commit
