George and everybody:
Attached find i2b2_pset_build.py rev 69feff7b00d9.
Usage:
i2b2_pset_build [options] <consented> <survey_order> <dummy_query>
It depends on pandas and sqlalchemy, and it's got some Oracle-isms. Any chance
it works for somebody else? Perhaps it's useful as design notes?
For example: we keep our consented cohort in an internal redcap project...
$ python i2b2_pset_build.py internal_redcap_export.csv
breast_cancer_survey_sample.csv "EMR Consented BC@18:30:18"
INFO:__main__:consented.count():
study_id 88
mrn 88
dtype: int64
INFO:__main__:consented_mrn.count()
order_id 88
mrn 88
study_id 88
dtype: int64
INFO:__main__:len(consented_crosswalk): 88
INFO:__main__:creating gpc_bc_consented
INFO:__main__:resulting patient set:
patient_set_id size1 size2
0 89175 88 88
--
Dan
'''i2b2_pset_build -- build patient set from study ids
Usage:
i2b2_pset_build [options] <consented> <survey_order> <dummy_query>
Options:
<consented> CSV file with study_id, order_id columns
<survey_order> CSV file with order_id, mrn columns
--id-schema=SCHEMA schema for identified patient_mapping table
[default: NIGHTHERONDATA]
--mrn-source CODE i2b2 patient_ide_source used for MRNs
in the patient_mapping table
[default: [email protected]]
--id-key NAME name of environment variable to find
SQLAlchemy DB URL for identified i2b2 database.
[default: ID_DB_ACCESS]
<dummy_query> name of existing query in deidentified i2b2
--deid-schema SCHEMA schema with qt_tables where we will find
the dummy query (by name) and store the
new patient set
[default: BLUEHERONDATA]
--deid-key NAME name of environment variable to find
SQLAlchemy DB URL.
[default: DEID_DB_ACCESS]
--cohort-table NAME name of scratch table where in we can store the
cohort in the deid db [default: gpc_bc_consented]
--debug
--help
'''
import logging
from docopt import docopt
from pandas import read_sql
log = logging.getLogger(__name__)
def main(argv, environ, cwd, create_engine):
'''See usage above.
@param argv: CLI args a la sys.argv
@param environ: process environment a la os.environ
@param cwd: access to files a la pathlib (plus pd.read_csv)
@param create_engine: access to databases by URL
a la sqlalchemy.create_engine
'''
cli = docopt(__doc__, argv=argv[1:])
log.debug('cli: %s', cli)
consented = read_consented(cwd / cli['<consented>'])
consented_mrn = mix_mrn(cwd / cli['<survey_order>'], consented)
consented_crosswalk = mrn_to_patient_id(
consented_mrn,
iddb=create_engine(environ[cli['--id-key']]),
id_dw=cli['--id-schema'], mrn_source=cli['--mrn-source'])
pset = add_pset_result(
cli['<dummy_query>'], consented_crosswalk,
db=create_engine(environ[cli['--deid-key']]),
schema=cli['--deid-schema'],
cohort_table=cli['--cohort-table'])
log.info('resulting patient set:\n%s', pset)
def read_consented(path):
consented = path.read_csv()[['study_id', 'mrn']]
log.info('consented.count():\n%s', consented.count())
x = consented.sort_values('mrn')
dups = x.mrn[x.mrn.duplicated()]
if len(dups):
log.error('duplicate MRNs:\n%s', x[x.mrn.isin(dups)])
raise IOError
return consented
def mix_mrn(path, consented):
survey_sample = _lower_cols(path.read_csv())[['order_id', 'mrn']]
consented_mrn = survey_sample.merge(consented).sort_values('study_id')
log.info('consented_mrn.count()\n%s',
consented_mrn.count())
return consented_mrn
def mrn_to_patient_id(pat, iddb, mrn_source, id_dw):
mrn_list_expr = ', '.join("'%d'" % n for n in pat.mrn)
crosswalk = read_sql('''
select distinct patient_num, to_number(patient_ide) mrn
, (select date_shift
from {id_dw}.patient_dimension pd
where pd.patient_num = pm.patient_num) date_shift
from nightherondata.patient_mapping pm
where pm.patient_ide_source = :mrn_source
and pm.patient_ide in ({mrn_list})
'''.format(mrn_list=mrn_list_expr,
id_dw=id_dw), iddb,
params=dict(mrn_source=mrn_source))
log.debug('%s', pat.columns)
log.debug('%s', crosswalk.columns)
consented_crosswalk = pat.merge(crosswalk)[[
'patient_num', 'study_id', 'date_shift']]
log.info('len(consented_crosswalk): %s', len(consented_crosswalk))
return consented_crosswalk
def add_pset_result(query_name, pat, db, schema,
cohort_table):
qi = db.execute('''
select qm.query_master_id, qi.query_instance_id
from {schema}.qt_query_master qm
join {schema}.qt_query_instance qi
on qi.query_master_id = qm.query_master_id
where qm.name = :query_name
'''.format(schema=schema), query_name=query_name).fetchone()
log.info('creating %s', cohort_table)
pat[['patient_num', 'study_id']].to_sql(
cohort_table, db, if_exists='replace')
pset_id = db.execute('''
select {schema}.QT_SQ_QRI_QRIID.nextval pset_id from dual
'''.format(schema=schema)).fetchone().pset_id
db.execute('''
insert into {schema}.qt_query_result_instance qri
(result_instance_id, query_instance_id, result_type_id,
set_size, real_set_size,
start_date, end_date, delete_flag, status_type_id,
description)
select :pset_id, :qiid, 1
, :set_size, :set_size
, sysdate, sysdate, 'N', 3
, 'Patient set for "' || :query_name || '"'
from dual
'''.format(schema=schema), pset_id=pset_id, qiid=qi.query_instance_id,
set_size=len(pat), query_name=query_name)
db.execute('''
insert into {schema}.qt_patient_set_collection
(patient_set_coll_id, result_instance_id, set_index, patient_num)
select
{schema}.QT_SQ_QPR_PCID.nextval
, :pset_id
, bc."index" + 1
, bc.patient_num
from {cohort_table} bc
'''.format(cohort_table=cohort_table, schema=schema),
pset_id=pset_id)
return read_sql(
'''
select
:pset_id patient_set_id,
(select count(*) from {cohort_table}) size1,
(select count(*) from {schema}.qt_patient_set_collection
where result_instance_id = :pset_id) size2
from dual
'''.format(cohort_table=cohort_table, schema=schema), db,
params=dict(pset_id=pset_id))
def _lower_cols(df):
return df.rename(columns=dict((n, n.lower()) for n in df.keys()))
class PandasPath(object):
'''pathlib API mixed with pandas I/O
'''
def __init__(self, path, ops):
pathjoin, read_csv = ops
self.read_csv = lambda: read_csv(path)
self.pathjoin = lambda other: self.__class__(
pathjoin(path, other), ops)
def __div__(self, other):
return self.pathjoin(other)
if __name__ == '__main__':
def _script():
from sys import argv
from os import environ
from os.path import join as pathjoin
from pandas import read_csv
from sqlalchemy import create_engine
logging.basicConfig(level=logging.DEBUG if '--debug' in argv
else logging.INFO)
main(argv, cwd=PandasPath('.', (pathjoin, read_csv)),
environ=environ, create_engine=create_engine)
_script()
_______________________________________________
Gpc-dev mailing list
[email protected]
http://listserv.kumc.edu/mailman/listinfo/gpc-dev