This is an automated email from the git hooks/post-receive script. afif pushed a commit to branch master in repository kineticstools.
commit 9b27a33cc63f195f78ad11e87de794f861bf6ce9 Author: Afif Elghraoui <[email protected]> Date: Mon Aug 22 23:11:01 2016 -0700 Imported Upstream version 0.6.0+dfsg --- README.md | 3 +- bin/__init__.py | 29 ---- circle.yml | 3 +- kineticsTools/ReferenceUtils.py | 11 +- kineticsTools/ResultWriter.py | 241 ++++++++++++++++++++---------- kineticsTools/ipdSummary.py | 91 ++++++----- kineticsTools/summarizeModifications.py | 2 +- kineticsTools/tasks/__init__.py | 0 kineticsTools/tasks/gather_kinetics_h5.py | 172 +++++++++++++++++++++ requirements-ci.txt | 3 +- requirements-dev.txt | 1 - setup.py | 6 +- test/cram/extra/bigwig.t | 15 ++ test/cram/version.t | 19 +-- test/test_gather_h5.py | 77 ++++++++++ test/test_outputs.py | 105 +++++++++++++ test/test_tool_contract.py | 28 ++-- 17 files changed, 624 insertions(+), 182 deletions(-) diff --git a/README.md b/README.md index 1e95e30..ec8a88b 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ -kineticsTools -============= + kineticsTools [](https://circleci.com/gh/PacificBiosciences/kineticsTools) Tools for detecting DNA modifications from single molecule, real-time (SMRT®) sequencing data. This tool implements the P_ModificationDetection module in SMRT® Portal, used by the RS_Modification_Detection and RS_Modifications_and_Motif_Detection protocol. Researchers interested in understanding or extending the modification detection algorthims can use these tools as a starting point. diff --git a/bin/__init__.py b/bin/__init__.py deleted file mode 100755 index 402d7a8..0000000 --- a/bin/__init__.py +++ /dev/null @@ -1,29 +0,0 @@ -################################################################################# -# Copyright (c) 2011-2013, Pacific Biosciences of California, Inc. -# -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# * Neither the name of Pacific Biosciences nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY -# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY PACIFIC BIOSCIENCES AND ITS -# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL PACIFIC BIOSCIENCES OR -# ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR -# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER -# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. -################################################################################# diff --git a/circle.yml b/circle.yml index a1e6411..01a3209 100644 --- a/circle.yml +++ b/circle.yml @@ -12,7 +12,8 @@ dependencies: - (cd .circleci && bash installHDF5.sh) - HDF5_DIR=$PWD/.circleci/prefix pip install -r requirements-ci.txt - HDF5_DIR=$PWD/.circleci/prefix pip install -r requirements-dev.txt + - pip install cram nose test: override: - - make test # Run doctests in addition to the usual unit tests + - make unit-tests diff --git a/kineticsTools/ReferenceUtils.py b/kineticsTools/ReferenceUtils.py index a48ca8d..e63758c 100755 --- a/kineticsTools/ReferenceUtils.py +++ b/kineticsTools/ReferenceUtils.py @@ -46,7 +46,7 @@ ReferenceWindow = namedtuple("ReferenceWindow", ["refId", "refName", "start", "e class ReferenceUtils(): @staticmethod - def loadReferenceContigs(referencePath, alignmentSet): + def loadReferenceContigs(referencePath, alignmentSet, windows=None): # FIXME we should get rid of this entirely, but I think it requires # fixing the inconsistency in how contigs are referenced here versus in # pbcore.io @@ -58,7 +58,14 @@ class ReferenceUtils(): # Read contigs from FASTA file (or XML dataset) refReader = ReferenceSet(referencePath) - contigs = [x for x in refReader] + contigs = [] + if windows is not None: + refNames = set([rw.refName for rw in windows]) + for contig in refReader: + if contig.id in refNames: + contigs.append(contig) + else: + contigs.extend([x for x in refReader]) contigDict = dict([(x.id, x) for x in contigs]) # initially each contig has an id of None -- this will be overwritten with the id from the cmp.h5, if there are any diff --git a/kineticsTools/ResultWriter.py b/kineticsTools/ResultWriter.py index d5e74e2..0671fcd 100755 --- a/kineticsTools/ResultWriter.py +++ b/kineticsTools/ResultWriter.py @@ -28,6 +28,7 @@ # POSSIBILITY OF SUCH DAMAGE. ################################################################################# +from collections import namedtuple, defaultdict import cProfile import logging import os.path @@ -47,6 +48,8 @@ FRAC = 'frac' FRAClow = 'fracLow' FRACup = 'fracUp' +log = logging.getLogger(__name__) + class ResultCollectorProcess(Process): @@ -61,7 +64,7 @@ class ResultCollectorProcess(Process): self._resultsQueue = resultsQueue def _run(self): - logging.info("Process %s (PID=%d) started running" % (self.name, self.pid)) + log.info("Process %s (PID=%d) started running" % (self.name, self.pid)) self.onStart() @@ -94,7 +97,7 @@ class ResultCollectorProcess(Process): nextChunkId += 1 - logging.info("Result thread shutting down...") + log.info("Result thread shutting down...") self.onFinish() def run(self): @@ -192,23 +195,6 @@ class KineticsWriter(ResultCollectorProcess): except Exception as e: print e - @consumer - def hdf5CsvConsumer(self, filename): - - grp = h5py.File(filename, "w") - - y = [int(ref.Length) for ref in self.refInfo] - dataLength = sum(y) - y.append(8192) - chunkSize = min(dataLength, 8192 * 2) - # print "dataLength = ", dataLength, " chunkSize = ", chunkSize, " y = ", y - - refIdDataset = grp.create_dataset('refId', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,), compression_opts=2) - tplDataset = grp.create_dataset('tpl', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,), compression_opts=2) - strandDataset = grp.create_dataset('strand', (dataLength,), dtype="u1", compression="gzip", chunks=(chunkSize,), compression_opts=2) - - - @consumer def csvConsumer(self, filename): @@ -293,12 +279,85 @@ class KineticsWriter(ResultCollectorProcess): print e @consumer - def hdf5CsvConsumer(self, filename): + def bigWigConsumer(self, filename): + import pyBigWig + records = [] + records_by_pos = defaultdict(list) + ranges = {} + BaseInfo = namedtuple("BaseInfo", ("seqid", "pos", "sense", "ipd")) + try: + while True: + chunk = (yield) + if len(chunk) == 0: + continue + # Fill out the ipd observations into the dataset + for x in chunk: + pos = int(x['tpl']) + 1 + seqid = x['refName'] + ranges.setdefault(seqid, (sys.maxint, 0)) + ranges[seqid] = (min(ranges[seqid][0], pos), + max(ranges[seqid][1], pos+1)) + rec = BaseInfo( + seqid=seqid, + pos=pos, + sense=int(x['strand']), + ipd=float(x['ipdRatio'])) + records.append(rec) + records_by_pos[(rec.seqid, rec.pos)].append(rec) + except GeneratorExit: + records.sort(lambda a, b: cmp(a.pos, b.pos)) + records.sort(lambda a, b: cmp(a.seqid, b.seqid)) + regions = [(s, ranges[s][1]-1) for s in sorted(ranges.keys())] + if len(regions) == 0: + with open(filename, "wb") as _: + return + bw = pyBigWig.open(filename, "w") + bw.addHeader(regions) + k = 0 + seqids = [] + starts = [] + ends = [] + ipd_enc = [] + # records are not necessarily consecutive or two per base! + have_pos = set() + def encode_ipds(plus, minus): + enc = lambda x: min(65535, int(round(100*x))) + return float(enc(minus) + 65536*enc(plus)) + for rec in records: + if (rec.seqid, rec.pos) in have_pos: + continue + have_pos.add((rec.seqid, rec.pos)) + strand_records = records_by_pos[(rec.seqid, rec.pos)] + if len(strand_records) == 2: + rec_minus = strand_records[k] if strand_records[k].sense else strand_records[k + 1] + rec_plus = strand_records[k + 1] if strand_records[k].sense else strand_records[k] + assert rec_plus.pos == rec_minus.pos, (rec_plus, rec_minus) + seqids.append(rec_plus.seqid) + starts.append(rec_plus.pos-1) + ends.append(rec_plus.pos) + ipd_enc.append(encode_ipds(rec_plus.ipd, rec_minus.ipd)) + else: + seqids.append(rec.seqid) + starts.append(rec.pos-1) + ends.append(rec.pos) + if rec.sense == 0: + ipd_enc.append(encode_ipds(rec.ipd, 0)) + else: + ipd_enc.append(encode_ipds(0, rec.ipd)) + log.info("Writing records for {n} bases".format(n=len(seqids))) + bw.addEntries(seqids, starts, ends=ends, values=ipd_enc) + bw.close() + return + @consumer + def hdf5CsvConsumer(self, filename): + """ + Write records to an HDF5 file equivalent to the CSV output, with +/- + bases adjacent in the arrays. + """ grp = h5py.File(filename, "w") - y = [int(ref.Length) for ref in self.refInfo] - dataLength = sum(y) + dataLength = 2 * sum(y) y.append(8192) chunkSize = min(dataLength, 8192 * 2) # print "dataLength = ", dataLength, " chunkSize = ", chunkSize, " y = ", y @@ -345,10 +404,9 @@ class KineticsWriter(ResultCollectorProcess): fracUpDataset = grp[FRACup] ''' - start = min(x['tpl'] for x in chunk) - end = min(max(x['tpl'] for x in chunk), tplDataset.shape[0] - 1) - - arrLen = end - start + 1 + start = min(2*x['tpl'] for x in chunk) + end = min(max(2*x['tpl'] for x in chunk), tplDataset.shape[0] - 1) + arrLen = end - start + 2 refId = np.empty(arrLen, dtype="u4") tpl = np.zeros(arrLen, dtype="u4") @@ -368,13 +426,15 @@ class KineticsWriter(ResultCollectorProcess): # Fill out the ipd observations into the dataset for x in chunk: # offset into the current chunk - idx = x['tpl'] - start + _strand = int(x['strand']) + idx = (2 * x['tpl']) - start + _strand - # Data points past the end of the reference can make it through -- filter them out here + # Data points past the end of the reference can make it + # through -- filter them out here if idx < arrLen: refId[idx] = int(x['refId']) - tpl[idx] = int(x['tpl']) - strand[idx] = int(x['strand']) + tpl[idx] = int(x['tpl']) + 1 # 'tpl' is 0-based + strand[idx] = _strand base[idx] = x['base'] score[idx] = int(x['score']) tMean[idx] = float(x['tMean']) @@ -392,31 +452,41 @@ class KineticsWriter(ResultCollectorProcess): fracLow[idx] = np.nan fracUp[idx] = np.nan - refIdDataset[start:(end + 1)] = refId - tplDataset[start:(end + 1)] = tpl - strandDataset[start:(end + 1)] = strand - baseDataset[start:(end + 1)] = base - scoreDataset[start:(end + 1)] = score - tMeanDataset[start:(end + 1)] = tMean - tErrDataset[start:(end + 1)] = tErr - modelPredictionDataset[start:(end + 1)] = modelPrediction - ipdRatioDataset[start:(end + 1)] = ipdRatio - coverageDataset[start:(end + 1)] = coverage + refIdDataset[start:(end + 2)] = refId + tplDataset[start:(end + 2)] = tpl + strandDataset[start:(end + 2)] = strand + baseDataset[start:(end + 2)] = base + scoreDataset[start:(end + 2)] = score + tMeanDataset[start:(end + 2)] = tMean + tErrDataset[start:(end + 2)] = tErr + modelPredictionDataset[start:(end + 2)] = modelPrediction + ipdRatioDataset[start:(end + 2)] = ipdRatio + coverageDataset[start:(end + 2)] = coverage if self.options.methylFraction: - fracDataset[start:(end + 1)] = frac - fracLowDataset[start:(end + 1)] = fracLow - fracUpDataset[start:(end + 1)] = fracUp - + fracDataset[start:(end + 2)] = frac + fracLowDataset[start:(end + 2)] = fracLow + fracUpDataset[start:(end + 2)] = fracUp + grp.flush() + del refId + del tpl + del strand + del base + del score + del tMean + del tErr + del modelPrediction + del ipdRatio + del coverage except GeneratorExit: # Close down the h5 file grp.close() return - # an alternative version that collects data into groups according to reference: @consumer def alt_hdf5CsvConsumer(self, filename): """ - Similar to csv consumer but writing to hdf5 format. + Alternative HDF5 output that collects data into groups according to + reference (not currently enabled). """ f = h5py.File(filename, "w") @@ -424,25 +494,26 @@ class KineticsWriter(ResultCollectorProcess): for ref in self.refInfo: # Each reference group will house a collection of datasets: - chunkSize = min(ref.Length, 8192) + dataLength = ref.Length * 2 + chunkSize = min(dataLength, 8192) # Create a group for each reference: grp = f.create_group(str(ref.Name)) - ds = grp.create_dataset('tpl', (ref.Length,), dtype="u4", compression="gzip", chunks=(chunkSize,)) - ds = grp.create_dataset('strand', (ref.Length,), dtype="u1", compression="gzip", chunks=(chunkSize,)) - ds = grp.create_dataset('base', (ref.Length,), dtype="a1", compression="gzip", chunks=(chunkSize,)) - ds = grp.create_dataset('score', (ref.Length,), dtype="u4", compression="gzip", chunks=(chunkSize,)) - ds = grp.create_dataset('tMean', (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,)) - ds = grp.create_dataset('tErr', (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,)) - ds = grp.create_dataset('modelPrediction', (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,)) - ds = grp.create_dataset('ipdRatio', (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,)) - ds = grp.create_dataset('coverage', (ref.Length,), dtype="u4", compression="gzip", chunks=(chunkSize,)) + ds = grp.create_dataset('tpl', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,)) + ds = grp.create_dataset('strand', (dataLength,), dtype="u1", compression="gzip", chunks=(chunkSize,)) + ds = grp.create_dataset('base', (dataLength,), dtype="a1", compression="gzip", chunks=(chunkSize,)) + ds = grp.create_dataset('score', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,)) + ds = grp.create_dataset('tMean', (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,)) + ds = grp.create_dataset('tErr', (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,)) + ds = grp.create_dataset('modelPrediction', (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,)) + ds = grp.create_dataset('ipdRatio', (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,)) + ds = grp.create_dataset('coverage', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,)) if self.options.methylFraction: - ds = grp.create_dataset(FRAC, (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,)) - ds = grp.create_dataset(FRAClow, (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,)) - ds = grp.create_dataset(FRACup, (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,)) + ds = grp.create_dataset(FRAC, (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,)) + ds = grp.create_dataset(FRAClow, (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,)) + ds = grp.create_dataset(FRACup, (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,)) # Maintain a dictionary of group paths? dsDict[ref.ID] = grp @@ -473,10 +544,10 @@ class KineticsWriter(ResultCollectorProcess): fracLowDataset = grp[FRAClow] fracUpDataset = grp[FRACup] - start = min(x['tpl'] for x in chunk) - end = min(max(x['tpl'] for x in chunk), tplDataset.shape[0] - 1) + start = min(2*x['tpl'] for x in chunk) + end = min(max(2*x['tpl'] for x in chunk), tplDataset.shape[0] - 1) - arrLen = end - start + 1 + arrLen = end - start + 2 tpl = np.zeros(arrLen, dtype="u4") strand = np.zeros(arrLen, dtype="u1") @@ -495,11 +566,12 @@ class KineticsWriter(ResultCollectorProcess): # Fill out the ipd observations into the dataset for x in chunk: # offset into the current chunk - idx = x['tpl'] - start + _strand = int(x['strand']) + idx = (2 * x['tpl']) - start + _strand # Data points past the end of the reference can make it through -- filter them out here if idx < arrLen: - tpl[idx] += int(x['tpl']) + tpl[idx] += int(x['tpl']) + 1 strand[idx] += int(x['strand']) base[idx] = x['base'] score[idx] += int(x['score']) @@ -519,20 +591,20 @@ class KineticsWriter(ResultCollectorProcess): fracUp[idx] = np.nan # Write our chunk into the main dataset - tplDataset[start:(end + 1)] = tpl - strandDataset[start:(end + 1)] = strand - baseDataset[start:(end + 1)] = base - scoreDataset[start:(end + 1)] = score - tMeanDataset[start:(end + 1)] = tMean - tErrDataset[start:(end + 1)] = tErr - modelPredictionDataset[start:(end + 1)] = modelPrediction - ipdRatioDataset[start:(end + 1)] = ipdRatio - coverageDataset[start:(end + 1)] = coverage + tplDataset[start:(end + 2)] = tpl + strandDataset[start:(end + 2)] = strand + baseDataset[start:(end + 2)] = base + scoreDataset[start:(end + 2)] = score + tMeanDataset[start:(end + 2)] = tMean + tErrDataset[start:(end + 2)] = tErr + modelPredictionDataset[start:(end + 2)] = modelPrediction + ipdRatioDataset[start:(end + 2)] = ipdRatio + coverageDataset[start:(end + 2)] = coverage if self.options.methylFraction: - fracDataset[start:(end + 1)] = frac - fracLowDataset[start:(end + 1)] = fracLow - fracUpDataset[start:(end + 1)] = fracUp - + fracDataset[start:(end + 2)] = frac + fracLowDataset[start:(end + 2)] = fracLow + fracUpDataset[start:(end + 2)] = fracUp + f.flush() except GeneratorExit: # Close down the h5 file f.close() @@ -559,7 +631,7 @@ class KineticsWriter(ResultCollectorProcess): for ref in self.refInfo: # FIXME -- create with good chunk parameters, activate compression - logging.info("Creating IpdRatio dataset w/ name: %s, Size: %d" % (str(ref.Name), ref.Length)) + log.info("Creating IpdRatio dataset w/ name: %s, Size: %d" % (str(ref.Name), ref.Length)) chunkSize = min(ref.Length, 8192) @@ -835,10 +907,11 @@ class KineticsWriter(ResultCollectorProcess): ('m5Cgff', 'm5C.gff', self.m5CgffConsumer), ('gff', 'gff', self.gffConsumer), ('csv', 'csv', self.csvConsumer), + ('bigwig', 'bw', self.bigWigConsumer), ('ms_csv', 'ms.csv', self.msCsvConsumer), ('pickle', 'pickle', self.csvConsumer), ('summary_h5', 'summary.h5', self.ipdRatioH5Consumer), - ('csv_h5', 'h5', self.hdf5CsvConsumer) + ('csv_h5', 'h5', self.alt_hdf5CsvConsumer) ] sinkList = [] @@ -850,7 +923,15 @@ class KineticsWriter(ResultCollectorProcess): # The 'outfile argument causes all outputs to be generated if self.options.outfile: - name = self.options.outfile + '.' + ext + if ext == "bw": + try: + import pyBigWig + except ImportError: + pass + else: + name = self.options.outfile + '.' + ext + else: + name = self.options.outfile + '.' + ext # Individual outputs can specified - these filename override the default if self.options.__getattribute__(fileType): diff --git a/kineticsTools/ipdSummary.py b/kineticsTools/ipdSummary.py index 5c52c54..6bfd088 100755 --- a/kineticsTools/ipdSummary.py +++ b/kineticsTools/ipdSummary.py @@ -61,12 +61,9 @@ from kineticsTools.ResultWriter import KineticsWriter from kineticsTools.ipdModel import IpdModel from kineticsTools.ReferenceUtils import ReferenceUtils -# Version info -__p4revision__ = "$Revision: #1 $" -__p4change__ = "$Change: 100972 $" -revNum = int(__p4revision__.strip("$").split(" ")[1].strip("#")) -changeNum = int(__p4change__.strip("$").split(":")[-1]) -__version__ = "2.2" +__version__ = "2.3" + +log = logging.getLogger(__name__) class Constants(object): TOOL_ID = "kinetics_tools.tasks.ipd_summary" @@ -129,17 +126,23 @@ def get_parser(): type=validateFile, help="Fasta or Reference DataSet") # XXX GFF and CSV are "option" for arg parser, not tool contract tcp.add_output_file_type(FileTypes.GFF, "gff", - name="GFF file", + name="Modified bases GFF", description="GFF file of modified bases", default_name="basemods") - tcp.add_output_file_type(FileTypes.CSV, "csv", - name="CSV file", - description="CSV file of per-nucleotide information", + tcp.add_output_file_type(FileTypes.BIGWIG, "bigwig", + name="BigWig file encoding base IpdRatios", + description="Compressed binary format containing the IpdRatios for every base (both strands)", + default_name="basemods") + tcp.add_output_file_type(FileTypes.H5, "csv_h5", + name="HDF5 file containing per-base information", + description="HDF5 equivalent of CSV output", default_name="basemods") argp.add_argument("--gff", action="store", default=None, help="Output GFF file of modified bases") argp.add_argument("--csv", action="store", default=None, help="Output CSV file out per-nucleotide information") + argp.add_argument("--bigwig", action="store", default=None, + help="Output BigWig file encoding IpdRatio for both strands") # FIXME use central --nproc option argp.add_argument('--numWorkers', '-j', dest='numWorkers', @@ -372,13 +375,18 @@ def _get_more_options(parser): default=None, help="Random seed (for development and debugging purposes only)") + parser.add_argument("--referenceStride", action="store", type=int, + default=1000, + help="Size of reference window in internal "+ + "parallelization. For testing purposes only.") + return parser class KineticsToolsRunner(object): def __init__(self, args): self.args = args - self._sharedAlignmentSet = None + self.alignments = None def start(self): self.validateArgs() @@ -508,7 +516,7 @@ class KineticsToolsRunner(object): for i in xrange(self.options.numWorkers): p = WorkerType(self.options, self._workQueue, self._resultsQueue, self.ipdModel, - sharedAlignmentSet=self._sharedAlignmentSet) + sharedAlignmentSet=self.alignments) self._workers.append(p) p.start() logging.info("Launched worker processes.") @@ -532,11 +540,11 @@ class KineticsToolsRunner(object): pass def loadReferenceAndModel(self, referencePath): - assert self._sharedAlignmentSet is not None + assert self.alignments is not None and self.referenceWindows is not None # Load the reference contigs - annotated with their refID from the cmp.h5 logging.info("Loading reference contigs %s" % referencePath) contigs = ReferenceUtils.loadReferenceContigs(referencePath, - alignmentSet=self._sharedAlignmentSet) + alignmentSet=self.alignments, windows=self.referenceWindows) # There are three different ways the ipdModel can be loaded. # In order of precedence they are: @@ -558,8 +566,7 @@ class KineticsToolsRunner(object): logging.error("Params path doesn't exist: %s" % self.args.paramsPath) sys.exit(1) - majorityChem = ReferenceUtils.loadAlignmentChemistry( - self._sharedAlignmentSet) + majorityChem = ReferenceUtils.loadAlignmentChemistry(self.alignments) ipdModel = os.path.join(self.args.paramsPath, majorityChem + ".h5") if majorityChem == 'unknown': logging.error("Chemistry cannot be identified---cannot perform kinetic analysis") @@ -580,11 +587,11 @@ class KineticsToolsRunner(object): """ logging.info("Reading AlignmentSet: %s" % cmpH5Filename) logging.info(" reference: %s" % self.args.reference) - self._sharedAlignmentSet = AlignmentSet(cmpH5Filename, - referenceFastaFname=self.args.reference) + self.alignments = AlignmentSet(cmpH5Filename, + referenceFastaFname=self.args.reference) # XXX this should ensure that the file(s) get opened, including any # .pbi indices - but need to confirm this - self.refInfo = self._sharedAlignmentSet.referenceInfoTable + self.refInfo = self.alignments.referenceInfoTable def _mainLoop(self): """ @@ -603,31 +610,16 @@ class KineticsToolsRunner(object): # interpreter crashes sometimes. See Bug 19704. Since we # don't leak garbage cycles, disabling the cyclic GC is # essentially harmless. - gc.disable() + #gc.disable() - # Load a copy of the cmpH5 alignment index to share with the slaves self.loadSharedAlignmentSet(self.args.alignment_set) - # Load reference and IpdModel - self.loadReferenceAndModel(self.args.reference) - - # Spawn workers - self._launchSlaveProcesses() - - # WARNING -- cmp.h5 file must be opened AFTER worker processes have been spawned - # cmp.h5 we're using -- use this to orchestrate the work - self.cmph5 = self._sharedAlignmentSet - logging.info('Generating kinetics summary for [%s]' % self.args.alignment_set) - - #self.referenceMap = self.cmph5['/RefGroup'].asDict('RefInfoID', 'ID') - #self.alnInfo = self.cmph5['/AlnInfo'].asRecArray() - # Resolve the windows that will be visited. if self.args.referenceWindowsAsString is not None: self.referenceWindows = [] for s in self.args.referenceWindowsAsString.split(","): try: - win = ReferenceUtils.parseReferenceWindow(s, self.cmph5.referenceInfo) + win = ReferenceUtils.parseReferenceWindow(s, self.alignments.referenceInfo) self.referenceWindows.append(win) except: if self.args.skipUnrecognizedContigs: @@ -635,11 +627,25 @@ class KineticsToolsRunner(object): else: raise Exception, "Unrecognized contig!" elif self.args.referenceWindowsFromAlignment: - self.referenceWindows = ReferenceUtils.referenceWindowsFromAlignment(self._sharedAlignmentSet, self.cmph5.referenceInfo) + self.referenceWindows = ReferenceUtils.referenceWindowsFromAlignment(self.alignments, self.alignments.referenceInfo) + refNames = set([rw.refName for rw in self.referenceWindows]) + # limit output to contigs that overlap with reference windows + self.refInfo = [r for r in self.refInfo if r.Name in refNames] else: self.referenceWindows = ReferenceUtils.createReferenceWindows( self.refInfo) + # Load reference and IpdModel + self.loadReferenceAndModel(self.args.reference) + + # Spawn workers + self._launchSlaveProcesses() + + logging.info('Generating kinetics summary for [%s]' % self.args.alignment_set) + + #self.referenceMap = self.alignments['/RefGroup'].asDict('RefInfoID', 'ID') + #self.alnInfo = self.alignments['/AlnInfo'].asRecArray() + # Main loop -- we loop over ReferenceGroups in the cmp.h5. For each contig we will: # 1. Load the sequence into the main memory of the parent process # 2. Fork the workers @@ -650,7 +656,7 @@ class KineticsToolsRunner(object): # Iterate over references for window in self.referenceWindows: logging.info('Processing window/contig: %s' % (window,)) - for chunk in ReferenceUtils.enumerateChunks(1000, window): + for chunk in ReferenceUtils.enumerateChunks(self.args.referenceStride, window): self._workQueue.put((self.workChunkCounter, chunk)) self.workChunkCounter += 1 @@ -667,7 +673,7 @@ class KineticsToolsRunner(object): self._resultsQueue.join() self._resultCollectorProcess.join() logging.info("ipdSummary.py finished. Exiting.") - del self.cmph5 + self.alignments.close() return 0 @@ -715,12 +721,14 @@ def resolved_tool_contract_runner(resolved_contract): alignment_path = rc.task.input_files[0] reference_path = rc.task.input_files[1] gff_path = rc.task.output_files[0] - csv_path = rc.task.output_files[1] + bigwig_path = rc.task.output_files[1] + h5_path = rc.task.output_files[2] args = [ alignment_path, "--reference", reference_path, "--gff", gff_path, - "--csv", csv_path, + "--bigwig", bigwig_path, + "--csv_h5", h5_path, "--numWorkers", str(rc.task.nproc), "--pvalue", str(rc.task.options[Constants.PVALUE_ID]), "--alignmentSetRefWindows", @@ -737,6 +745,7 @@ def resolved_tool_contract_runner(resolved_contract): args.extend([ "--identify", rc.task.options[Constants.IDENTIFY_ID], ]) + log.info("Converted arguments: ipdSummary {c}".format(c=" ".join(args))) args_ = get_parser().arg_parser.parser.parse_args(args) return args_runner(args_) diff --git a/kineticsTools/summarizeModifications.py b/kineticsTools/summarizeModifications.py index deb2768..2f2c669 100755 --- a/kineticsTools/summarizeModifications.py +++ b/kineticsTools/summarizeModifications.py @@ -178,7 +178,7 @@ def get_parser(): name="GFF file", description="Alignment summary GFF") p.add_output_file_type(FileTypes.GFF, "gff_out", - name="GFF file", + name="Alignment summary with basemods", description="Modified alignment summary file", default_name="alignment_summary_with_basemods") return p diff --git a/kineticsTools/tasks/__init__.py b/kineticsTools/tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/kineticsTools/tasks/gather_kinetics_h5.py b/kineticsTools/tasks/gather_kinetics_h5.py new file mode 100644 index 0000000..76703f5 --- /dev/null +++ b/kineticsTools/tasks/gather_kinetics_h5.py @@ -0,0 +1,172 @@ + +""" +pbsmrtpipe task to gather chunked hdf5 files containing raw kinetics data. +""" + + +from collections import defaultdict +import logging +import sys + +import h5py + +from pbcommand.pb_io.common import load_pipeline_chunks_from_json +from pbcommand.cli import pbparser_runner +from pbcommand.models import get_gather_pbparser, FileTypes +from pbcommand.utils import setup_log + +log = logging.getLogger(__name__) + + +class Constants(object): + TASK_ID = "kinetics_tools.tasks.gather_kinetics_h5" + VERSION = "0.1.0" + DRIVER_EXE = "python -m kineticsTools.tasks.gather_kinetics_h5 --resolved-tool-contract " + CHUNK_KEY = "$chunk.h5_id" + OPT_CHUNK_KEY = "kinetics_tools.tasks.gather_h5_chunk_key" + + +def get_parser(): + p = get_gather_pbparser(Constants.TASK_ID, + Constants.VERSION, + "Dev Kinetics HDF5 Gather", + "General Chunk Kinetics HDF5 Gather", + Constants.DRIVER_EXE, + is_distributed=True, + default_level=logging.INFO) + p.add_input_file_type(FileTypes.CHUNK, "cjson_in", "GCHUNK Json", + "Gathered CHUNK Json with BigWig chunk key") + + p.add_output_file_type(FileTypes.H5, "h5_out", + "Kinetics HDF5 file", + "Gathered HDF5 file", "gathered") + + # Only need to add to argparse layer for the commandline + p.arg_parser.add_str(Constants.OPT_CHUNK_KEY, + "chunk_key", + Constants.CHUNK_KEY, + "Chunk key", + "Chunk key to use (format $chunk.{chunk-key}") + + return p + + +def gather_kinetics_h5(chunked_files, output_file): + """ + Gather a set of 'flat' kinetics hdf5 chunks. This will not scale well for + large (i.e. non-bacterial) genomes. + """ + log.info("creating {f}...".format(f=output_file)) + out = h5py.File(output_file, "w") + first = h5py.File(chunked_files[0]) + dataLength = len(first[first.keys()[0]]) + chunkSize = min(dataLength, 8192 * 8) + datasets = {} + def _add_chunk(chunk): + # FIXME this is insanely inefficient + log.debug(" getting mask") + mask = chunk['base'].__array__() != '' + for key in datasets.keys(): + log.debug(" copying '{k}'".format(k=key)) + datasets[key][mask] = chunk[key][mask] + del mask + for key in first.keys(): + ds = out.create_dataset(key, (dataLength,), + dtype=first[key].dtype, + compression="gzip", + chunks=(chunkSize,), + compression_opts=2) + datasets[key] = ds + log.info("adding first chunk in {f}".format(f=chunked_files[0])) + _add_chunk(first) + out.flush() + for file_name in chunked_files[1:]: + #out = h5py.File(output_file, "r+") + chunk = h5py.File(file_name) + log.info("adding chunk in {f}".format(f=file_name)) + _add_chunk(chunk) + out.close() + return 0 + + +def gather_kinetics_h5_byref(chunked_files, output_file): + """ + Gather a set of hdf5 files containing with per-reference datasets. This + implementation sacrifices some computational efficiency to limit the + memory overhead. + """ + log.info("creating {f}...".format(f=output_file)) + out = h5py.File(output_file, "w") + first = h5py.File(chunked_files[0]) + refs_by_file = defaultdict(list) + ds_types = {} + ref_sizes = {} + for file_name in chunked_files: + chunk = h5py.File(file_name) + for rname in chunk.keys(): + refs_by_file[rname].append(file_name) + grp = chunk[rname] + for ds_id in grp.keys(): + if not ds_id in ds_types: + ds_types[ds_id] = grp[ds_id].dtype + else: + assert ds_types[ds_id] == grp[ds_id].dtype + if not rname in ref_sizes: + ref_sizes[rname] = len(grp[ds_id]) + else: + assert ref_sizes[rname] == len(grp[ds_id]) + chunk.close() + for ref_name, file_names in refs_by_file.iteritems(): + log.info("creating group {r} (dataset length {s})".format( + r=ref_name, + s=ref_sizes[ref_name])) + grp = out.create_group(ref_name) + dataLength = ref_sizes[ref_name] + chunkSize = min(dataLength, 8192) + for ds_id, ds_type in ds_types.iteritems(): + log.debug(" dataset {i} ({t})".format(i=ds_id, t=ds_type)) + ds = grp.create_dataset(ds_id, (dataLength,), dtype=ds_type, + compression="gzip", chunks=(chunkSize,)) + for file_name in file_names: + log.debug(" reading from {f}".format(f=file_name)) + chunk = h5py.File(file_name) + grp_chk = chunk[ref_name] + mask = grp_chk['base'].__array__() != '' + ds[mask] = grp_chk[ds_id][mask] + del mask + chunk.close() + out.flush() + out.close() + return 0 + + +def _run_main(chunk_input_json, output_file, chunk_key): + chunks = load_pipeline_chunks_from_json(chunk_input_json) + chunked_files = [] + for chunk in chunks: + if chunk_key in chunk.chunk_keys: + chunked_files.append(chunk.chunk_d[chunk_key]) + else: + raise KeyError("Unable to find chunk key '{i}' in {p}".format(i=chunk_key, p=chunk)) + return gather_kinetics_h5_byref(chunked_files, output_file) + + +def args_runner(args): + return _run_main(args.cjson_in, args.bigwig_out, args.chunk_key) + + +def rtc_runner(rtc): + return _run_main(rtc.task.input_files[0], rtc.task.output_files[0], rtc.task.chunk_key) + + +def main(argv=sys.argv): + return pbparser_runner(argv[1:], + get_parser(), + args_runner, + rtc_runner, + log, + setup_log) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/requirements-ci.txt b/requirements-ci.txt index a43e0c4..002d11d 100644 --- a/requirements-ci.txt +++ b/requirements-ci.txt @@ -1,12 +1,13 @@ cython numpy -h5py +h5py >= 1.3.0 jinja2 networkx jsonschema xmlbuilder functools32 pyxb +pyBigWig # Install from github -e git://github.com/PacificBiosciences/pbcore.git@master#egg=pbcore -e git://github.com/PacificBiosciences/pbcommand.git#egg=pbcommand diff --git a/requirements-dev.txt b/requirements-dev.txt index 80e9144..0af0380 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,3 +1,2 @@ --r requirements.txt sphinx nose diff --git a/setup.py b/setup.py index bc1f56b..e1eb4bf 100755 --- a/setup.py +++ b/setup.py @@ -4,12 +4,11 @@ import sys setup( name='kineticsTools', - version='0.5.2', + version='0.6.0', author='Pacific Biosciences', author_email='[email protected]', license=open('LICENSES.txt').read(), - packages=find_packages('.'), - package_dir={'': '.'}, + packages=find_packages("."), package_data={'kineticsTools': ['resources/*.h5']}, ext_modules=[Extension('kineticsTools/tree_predict', ['kineticsTools/tree_predict.c'], extra_compile_args=["-O3", "-shared", "-std=c99"], @@ -21,6 +20,7 @@ setup( 'h5py >= 1.3.0', 'scipy >= 0.9.0', 'pbcommand >= 0.3.22', + #'pyBigWig' ], entry_points={'console_scripts': [ "ipdSummary = kineticsTools.ipdSummary:main", diff --git a/test/cram/extra/bigwig.t b/test/cram/extra/bigwig.t new file mode 100644 index 0000000..5742c29 --- /dev/null +++ b/test/cram/extra/bigwig.t @@ -0,0 +1,15 @@ +Test support for BigWig output of IpdRatio. + + $ . $TESTDIR/../portability.sh + +Load in data: + + $ DATA=/pbi/dept/secondary/siv/testdata/kineticsTools + $ INPUT=$DATA/Hpyl_1_5000.xml + $ REFERENCE=/pbi/dept/secondary/siv/references/Helicobacter_pylori_J99/sequence/Helicobacter_pylori_J99.fasta + +Run basic ipdSummary: + + $ ipdSummary --log-level=WARNING --bigwig tmp1.bw --numWorkers 12 --pvalue 0.001 --identify m6A,m4C --reference $REFERENCE --referenceWindows="gi|12057207|gb|AE001439.1|:0-5000" $INPUT + $ ls tmp1.bw + tmp1.bw diff --git a/test/cram/version.t b/test/cram/version.t index ef00278..91ce3dd 100644 --- a/test/cram/version.t +++ b/test/cram/version.t @@ -9,14 +9,15 @@ A simple test of the version and help options: [--log-file LOG_FILE] [--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL} | --debug | --quiet | -v] --reference REFERENCE [--gff GFF] [--csv CSV] - [--numWorkers NUMWORKERS] [--pvalue PVALUE] - [--maxLength MAXLENGTH] [--identify IDENTIFY] - [--methylFraction] [--outfile OUTFILE] [--m5Cgff M5CGFF] - [--m5Cclassifer M5CCLASSIFIER] [--csv_h5 CSV_H5] - [--pickle PICKLE] [--summary_h5 SUMMARY_H5] - [--ms_csv MS_CSV] [--control CONTROL] [--useLDA] - [--paramsPath PARAMSPATH] [--minCoverage MINCOVERAGE] - [--maxQueueSize MAXQUEUESIZE] [--maxCoverage MAXCOVERAGE] + [--bigwig BIGWIG] [--numWorkers NUMWORKERS] + [--pvalue PVALUE] [--maxLength MAXLENGTH] + [--identify IDENTIFY] [--methylFraction] [--outfile OUTFILE] + [--m5Cgff M5CGFF] [--m5Cclassifer M5CCLASSIFIER] + [--csv_h5 CSV_H5] [--pickle PICKLE] + [--summary_h5 SUMMARY_H5] [--ms_csv MS_CSV] + [--control CONTROL] [--useLDA] [--paramsPath PARAMSPATH] + [--minCoverage MINCOVERAGE] [--maxQueueSize MAXQUEUESIZE] + [--maxCoverage MAXCOVERAGE] [--mapQvThreshold MAPQVTHRESHOLD] [--ipdModel IPDMODEL] [--modelIters MODELITERS] [--cap_percentile CAP_PERCENTILE] [--methylMinCov METHYLMINCOV] @@ -27,7 +28,7 @@ A simple test of the version and help options: [-W REFERENCEWINDOWSASSTRING] [--skipUnrecognizedContigs SKIPUNRECOGNIZEDCONTIGS] [--alignmentSetRefWindows] [--threaded] [--profile] [--pdb] - [--seed RANDOMSEED] + [--seed RANDOMSEED] [--referenceStride REFERENCESTRIDE] alignment_set ipdSummary: error: too few arguments [2] diff --git a/test/test_gather_h5.py b/test/test_gather_h5.py new file mode 100644 index 0000000..954fa26 --- /dev/null +++ b/test/test_gather_h5.py @@ -0,0 +1,77 @@ + +import unittest +import tempfile + +import numpy as np +import h5py + +import pbcommand.testkit.core +from pbcommand.models import PipelineChunk +from pbcommand.pb_io.common import write_pipeline_chunks + +from kineticsTools.tasks.gather_kinetics_h5 import gather_kinetics_h5 + +class SetUpHDF5(object): + DATALENGTH = 10000 + CHUNKSIZE = 2048 + CHUNKED_FILES = [ + tempfile.NamedTemporaryFile(suffix=".h5").name, + tempfile.NamedTemporaryFile(suffix=".h5").name + ] + + @classmethod + def makeInputs(cls): + for k, ifn in enumerate(cls.CHUNKED_FILES): + f = h5py.File(cls.CHUNKED_FILES[k], "w") + a = f.create_dataset("base", (cls.DATALENGTH,), + dtype="a1", compression="gzip", + chunks=(cls.CHUNKSIZE,), compression_opts=2) + b = f.create_dataset("score", (cls.DATALENGTH,), + dtype="u4", compression="gzip", + chunks=(cls.CHUNKSIZE,), compression_opts=2) + c = f.create_dataset("tMean", (cls.DATALENGTH,), + dtype="f4", compression="gzip", + chunks=(cls.CHUNKSIZE,), compression_opts=2) + start = k * (cls.DATALENGTH / 2) + end = (k + 1) * (cls.DATALENGTH / 2) + a[start:end] = np.array(['A' for x in range(start, end)]) + b[start:end] = np.array([2*(k+1) for x in range(start, end)]) + c[start:end] = np.sqrt(np.array(range(start+1, end+1))) + f.close() + + +class TestGatherHDF5(unittest.TestCase, SetUpHDF5): + + @classmethod + def setUpClass(cls): + cls.makeInputs() + + def test_gather_kinetics_h5(self): + ofn = tempfile.NamedTemporaryFile(suffix=".h5").name + gather_kinetics_h5(self.CHUNKED_FILES, ofn) + f = h5py.File(ofn) + self.assertTrue(all(f['base'].__array__() == 'A')) + self.assertTrue(all(f['score'].__array__() > 0)) + self.assertEqual(f['score'].__array__().mean(), 3) + self.assertTrue(all(f['tMean'].__array__() > 0)) + d = np.round(f['tMean'].__array__() ** 2).astype("u4") + self.assertTrue(all(d == np.array(range(1, self.DATALENGTH+1)))) + + +class TestGatherH5ToolContract(pbcommand.testkit.core.PbTestGatherApp, SetUpHDF5): + DRIVER_BASE = "python -m kineticsTools.tasks.gather_kinetics_h5 " + INPUT_FILES = [tempfile.NamedTemporaryFile(suffix=".json").name] + CHUNK_KEY = "$chunk.h5_id" + + @classmethod + def setUpClass(cls): + super(TestGatherH5ToolContract, cls).setUpClass() + cls.makeInputs() + chunks = [PipelineChunk(chunk_id="chunk_data_{i}".format(i=i), + **({cls.CHUNK_KEY:fn})) + for i, fn in enumerate(cls.CHUNKED_FILES)] + write_pipeline_chunks(chunks, cls.INPUT_FILES[0], None) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_outputs.py b/test/test_outputs.py new file mode 100644 index 0000000..03430de --- /dev/null +++ b/test/test_outputs.py @@ -0,0 +1,105 @@ + +""" +Test sanity of various output formats for a minimal real-world example. +""" + +import subprocess +import tempfile +import unittest +import os.path +import csv +import re + +import h5py + + +os.environ["PACBIO_TEST_ENV"] = "1" # turns off --verbose + +DATA_DIR = "/pbi/dept/secondary/siv/testdata/kineticsTools" +REF_DIR = "/pbi/dept/secondary/siv/references/Helicobacter_pylori_J99" +ALIGNMENTS = os.path.join(DATA_DIR, "Hpyl_1_5000.xml") +REFERENCE = os.path.join(REF_DIR, "sequence", "Helicobacter_pylori_J99.fasta") + +skip_if_no_data = unittest.skipUnless( + os.path.isdir(DATA_DIR) and os.path.isdir(REF_DIR), + "%s or %s not available" % (DATA_DIR, REF_DIR)) + +@skip_if_no_data +class TestOutputs(unittest.TestCase): + + @classmethod + @skip_if_no_data + def setUpClass(cls): + prefix = tempfile.NamedTemporaryFile().name + cls.h5_file = "{p}.h5".format(p=prefix) + cls.csv_file = "{p}.csv".format(p=prefix) + cls.gff_file = "{p}.gff".format(p=prefix) + cls.bw_file = "{p}.bw".format(p=prefix) + args = [ + "ipdSummary", "--log-level", "WARNING", + "--csv_h5", cls.h5_file, + "--csv", cls.csv_file, + "--gff", cls.gff_file, + "--bigwig", cls.bw_file, + "--numWorkers", "12", + "--pvalue", "0.001", + "--identify", "m6A,m4C", + "--referenceStride", "100", + "--referenceWindows", "gi|12057207|gb|AE001439.1|:0-200", + "--reference", REFERENCE, + ALIGNMENTS + ] + print " ".join(args) + assert subprocess.call(args) == 0 + with open(cls.csv_file) as f: + cls.csv_records = [l.split(",") for l in f.read().splitlines()][1:] + + @classmethod + def tearDownClass(cls): + return + for fn in [cls.h5_file, cls.csv_file, cls.gff_file, cls.bw_file]: + if os.path.exists(fn): + os.remove(fn) + + def test_h5_output(self): + f = h5py.File(self.h5_file) + bases = f['base'].__array__() + self.assertEqual(bases[0], "A") + self.assertEqual(bases[100], "T") + self.assertEqual(list(bases[0:400] != "").count(True), 400) + seqh_fwd = ''.join([f['base'][x*2] for x in range(200)]) + seqc_fwd = ''.join([self.csv_records[x*2][3] for x in range(200)]) + self.assertEqual(seqh_fwd, seqc_fwd) + seqh_rev = ''.join([f['base'][(x*2)+1] for x in range(200)]) + seqc_rev = ''.join([self.csv_records[(x*2)+1][3] for x in range(200)]) + self.assertEqual(seqh_rev, seqc_rev) + tpl_fwd = [f['tpl'][x*2] for x in range(200)] + self.assertEqual(tpl_fwd, range(1, 201)) + f = h5py.File(self.h5_file) + for i_rec, rec in enumerate(self.csv_records): + self.assertEqual(str(f['tpl'][i_rec]), self.csv_records[i_rec][1]) + self.assertEqual("%.3f"%f['ipdRatio'][i_rec], + self.csv_records[i_rec][8]) + + def test_csv_output(self): + self.assertEqual(len(self.csv_records), 400) + self.assertEqual(self.csv_records[0][3], "A") + self.assertEqual(self.csv_records[100][3], "T") + + def test_bigwig(self): + import pyBigWig + f = pyBigWig.open(self.bw_file) + for i_rec, rec in enumerate(self.csv_records): + seqid = re.sub('\"', "", rec[0]) + tpl = int(rec[1]) - 1 + s = int(f.values(seqid, tpl, tpl+1)[0]) + ipd_minus = (s % 65536) / 100.0 + ipd_plus = (s >> 16) / 100.0 + if rec[2] == "1": + self.assertAlmostEqual(ipd_minus, float(rec[8]), places=1) + else: + self.assertAlmostEqual(ipd_plus, float(rec[8]), places=1) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_tool_contract.py b/test/test_tool_contract.py index 5056b24..023d585 100755 --- a/test/test_tool_contract.py +++ b/test/test_tool_contract.py @@ -9,6 +9,8 @@ import logging import os.path import csv +import h5py + import pbcommand.testkit os.environ["PACBIO_TEST_ENV"] = "1" # turns off --verbose @@ -29,6 +31,7 @@ gi|12057207|gb|AE001439.1|\tkinModCall\tm6A\t35\t35\t187\t-\t.\tcoverage=118;con gi|12057207|gb|AE001439.1|\tkinModCall\tm4C\t60\t60\t49\t-\t.\tcoverage=112;context=AAAAAGCTCGCTCAAAAACCCTTGATTTAAGGGCGTTTTAT;IPDRatio=2.58;identificationQv=33 gi|12057207|gb|AE001439.1|\tkinModCall\tm6A\t89\t89\t223\t+\t.\tcoverage=139;context=AGCGAGCTTTTTGCTCAAAGAATCCAAGATAGCGTTTAAAA;IPDRatio=5.69;identificationQv=187""" +# FIXME this is much too slow @unittest.skipUnless(os.path.isdir(DATA_DIR) and os.path.isdir(REF_DIR), "%s or %s not available" % (DATA_DIR, REF_DIR)) class TestIpdSummary(pbcommand.testkit.PbTestApp): @@ -49,27 +52,29 @@ class TestIpdSummary(pbcommand.testkit.PbTestApp): def run_after(self, rtc, output_dir): gff_file = os.path.join(output_dir, rtc.task.output_files[0]) - csv_file = os.path.join(output_dir, rtc.task.output_files[1]) def lc(fn): return len(open(fn).readlines()) self.assertEqual(lc(gff_file), Constants.N_LINES_GFF) - self.assertEqual(lc(csv_file), Constants.N_LINES_CSV) - def head(fn,n): return "\n".join( open(fn).read().splitlines()[0:n] ) - self.assertEqual(head(csv_file, 3), Constants.INITIAL_LINES_CSV) def head2(fn,n): out = [] - i = 0 for line in open(fn).read().splitlines(): if line[0] != '#': out.append(line) - i += 1 - if i == n: + if len(out) == n: break return "\n".join(out) self.assertEqual(head2(gff_file, 3), Constants.INITIAL_LINES_GFF) - - [email protected](os.path.isdir(DATA_DIR) and os.path.isdir(REF_DIR), - "%s or %s not available" % (DATA_DIR, REF_DIR)) + f = h5py.File(rtc.task.output_files[2]) + seqh_fwd = ''.join([f['base'][x*2] for x in range(5000)]) + seqh_rev = ''.join([f['base'][(x*2)+1] for x in range(5000)]) + self.assertEqual(len(seqh_fwd), 5000) + self.assertEqual(len(seqh_rev), 5000) + + +# FIXME this is covered by integration tests but the output is twitchy enough +# that it would be good to test here as well +#@unittest.skipUnless(os.path.isdir(DATA_DIR) and os.path.isdir(REF_DIR), +# "%s or %s not available" % (DATA_DIR, REF_DIR)) [email protected]("FIXME") class TestIpdSummaryChunk(TestIpdSummary): """ This test is identical to the above except for using an AlignmentSet with @@ -84,7 +89,6 @@ class TestIpdSummaryChunk(TestIpdSummary): def run_after(self, rtc, output_dir): gff_file = os.path.join(output_dir, rtc.task.output_files[0]) - csv_file = os.path.join(output_dir, rtc.task.output_files[1]) logging.critical(gff_file) logging.critical("%s %s" % (csv_file, os.path.getsize(csv_file))) with open(csv_file) as f: -- Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/kineticstools.git _______________________________________________ debian-med-commit mailing list [email protected] http://lists.alioth.debian.org/cgi-bin/mailman/listinfo/debian-med-commit
