George and everybody:

Attached find i2b2_pset_build.py rev 69feff7b00d9.

Usage:
  i2b2_pset_build [options] <consented> <survey_order> <dummy_query>

It depends on pandas and sqlalchemy, and it's got some Oracle-isms. Any chance 
it works for somebody else? Perhaps it's useful as design notes?

For example: we keep our consented cohort in an internal redcap project...

$ python i2b2_pset_build.py internal_redcap_export.csv 
breast_cancer_survey_sample.csv "EMR Consented BC@18:30:18"

INFO:__main__:consented.count():
study_id    88
mrn         88
dtype: int64
INFO:__main__:consented_mrn.count()
order_id    88
mrn         88
study_id    88
dtype: int64
INFO:__main__:len(consented_crosswalk): 88
INFO:__main__:creating gpc_bc_consented
INFO:__main__:resulting patient set:
   patient_set_id  size1  size2
0           89175     88     88


--
Dan

'''i2b2_pset_build -- build patient set from study ids

Usage:
  i2b2_pset_build [options] <consented> <survey_order> <dummy_query>

Options:
  <consented>           CSV file with study_id, order_id columns
  <survey_order>        CSV file with order_id, mrn columns
  --id-schema=SCHEMA    schema for identified patient_mapping table
                        [default: NIGHTHERONDATA]
  --mrn-source CODE     i2b2 patient_ide_source used for MRNs
                        in the patient_mapping table
                        [default: [email protected]]
  --id-key NAME         name of environment variable to find
                        SQLAlchemy DB URL for identified i2b2 database.
                        [default: ID_DB_ACCESS]
  <dummy_query>         name of existing query in deidentified i2b2
  --deid-schema SCHEMA  schema with qt_tables where we will find
                        the dummy query (by name) and store the
                        new patient set
                        [default: BLUEHERONDATA]
  --deid-key NAME       name of environment variable to find
                        SQLAlchemy DB URL.
                        [default: DEID_DB_ACCESS]
  --cohort-table NAME   name of scratch table where in we can store the
                        cohort in the deid db [default: gpc_bc_consented]
  --debug
  --help

'''

import logging

from docopt import docopt
from pandas import read_sql

log = logging.getLogger(__name__)


def main(argv, environ, cwd, create_engine):
    '''See usage above.

    @param argv: CLI args a la sys.argv
    @param environ: process environment a la os.environ
    @param cwd: access to files a la pathlib (plus pd.read_csv)
    @param create_engine: access to databases by URL
                          a la sqlalchemy.create_engine
    '''
    cli = docopt(__doc__, argv=argv[1:])
    log.debug('cli: %s', cli)
    consented = read_consented(cwd / cli['<consented>'])
    consented_mrn = mix_mrn(cwd / cli['<survey_order>'], consented)
    consented_crosswalk = mrn_to_patient_id(
        consented_mrn,
        iddb=create_engine(environ[cli['--id-key']]),
        id_dw=cli['--id-schema'], mrn_source=cli['--mrn-source'])
    pset = add_pset_result(
        cli['<dummy_query>'], consented_crosswalk,
        db=create_engine(environ[cli['--deid-key']]),
        schema=cli['--deid-schema'],
        cohort_table=cli['--cohort-table'])
    log.info('resulting patient set:\n%s', pset)


def read_consented(path):
    consented = path.read_csv()[['study_id', 'mrn']]
    log.info('consented.count():\n%s', consented.count())

    x = consented.sort_values('mrn')
    dups = x.mrn[x.mrn.duplicated()]
    if len(dups):
        log.error('duplicate MRNs:\n%s', x[x.mrn.isin(dups)])
        raise IOError
    return consented


def mix_mrn(path, consented):
    survey_sample = _lower_cols(path.read_csv())[['order_id', 'mrn']]
    consented_mrn = survey_sample.merge(consented).sort_values('study_id')
    log.info('consented_mrn.count()\n%s',
             consented_mrn.count())
    return consented_mrn


def mrn_to_patient_id(pat, iddb, mrn_source, id_dw):
    mrn_list_expr = ', '.join("'%d'" % n for n in pat.mrn)
    crosswalk = read_sql('''
        select distinct patient_num, to_number(patient_ide) mrn
             , (select date_shift
                from {id_dw}.patient_dimension pd
                where pd.patient_num = pm.patient_num) date_shift
        from nightherondata.patient_mapping pm
        where pm.patient_ide_source = :mrn_source
        and pm.patient_ide in ({mrn_list})
        '''.format(mrn_list=mrn_list_expr,
                   id_dw=id_dw), iddb,
                         params=dict(mrn_source=mrn_source))
    log.debug('%s', pat.columns)
    log.debug('%s', crosswalk.columns)
    consented_crosswalk = pat.merge(crosswalk)[[
        'patient_num', 'study_id', 'date_shift']]
    log.info('len(consented_crosswalk): %s', len(consented_crosswalk))
    return consented_crosswalk


def add_pset_result(query_name, pat, db, schema,
                    cohort_table):
    qi = db.execute('''
    select qm.query_master_id, qi.query_instance_id
    from {schema}.qt_query_master qm
    join {schema}.qt_query_instance qi
      on qi.query_master_id = qm.query_master_id
    where qm.name = :query_name
    '''.format(schema=schema), query_name=query_name).fetchone()

    log.info('creating %s', cohort_table)
    pat[['patient_num', 'study_id']].to_sql(
        cohort_table, db, if_exists='replace')

    pset_id = db.execute('''
    select {schema}.QT_SQ_QRI_QRIID.nextval pset_id from dual
    '''.format(schema=schema)).fetchone().pset_id

    db.execute('''
    insert into {schema}.qt_query_result_instance qri
      (result_instance_id, query_instance_id, result_type_id,
       set_size, real_set_size,
       start_date, end_date, delete_flag, status_type_id,
       description)

    select :pset_id, :qiid, 1
         , :set_size, :set_size
         , sysdate, sysdate, 'N', 3
         , 'Patient set for "' || :query_name || '"'
    from dual
    '''.format(schema=schema), pset_id=pset_id, qiid=qi.query_instance_id,
               set_size=len(pat), query_name=query_name)

    db.execute('''
    insert into {schema}.qt_patient_set_collection
      (patient_set_coll_id, result_instance_id, set_index, patient_num)

    select
        {schema}.QT_SQ_QPR_PCID.nextval
      , :pset_id
      , bc."index" + 1
      , bc.patient_num
    from {cohort_table} bc
    '''.format(cohort_table=cohort_table, schema=schema),
               pset_id=pset_id)

    return read_sql(
        '''
        select
          :pset_id patient_set_id,
          (select count(*) from {cohort_table}) size1,
          (select count(*) from {schema}.qt_patient_set_collection
           where result_instance_id = :pset_id) size2
        from dual
        '''.format(cohort_table=cohort_table, schema=schema), db,
        params=dict(pset_id=pset_id))


def _lower_cols(df):
    return df.rename(columns=dict((n, n.lower()) for n in df.keys()))


class PandasPath(object):
    '''pathlib API mixed with pandas I/O
    '''
    def __init__(self, path, ops):
        pathjoin, read_csv = ops
        self.read_csv = lambda: read_csv(path)
        self.pathjoin = lambda other: self.__class__(
            pathjoin(path, other), ops)

    def __div__(self, other):
        return self.pathjoin(other)


if __name__ == '__main__':
    def _script():
        from sys import argv
        from os import environ
        from os.path import join as pathjoin
        from pandas import read_csv
        from sqlalchemy import create_engine

        logging.basicConfig(level=logging.DEBUG if '--debug' in argv
                            else logging.INFO)
        main(argv, cwd=PandasPath('.', (pathjoin, read_csv)),
             environ=environ, create_engine=create_engine)

    _script()
_______________________________________________
Gpc-dev mailing list
[email protected]
http://listserv.kumc.edu/mailman/listinfo/gpc-dev

Reply via email to