very helpful!!! i will keep you posted if I have any issue / question Best, Eila
On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <chamik...@google.com> wrote: > > > On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof < > e...@orielresearch.org> wrote: > >> Hi Cham, >> >> Please see inline. If possible, code / pseudo code will help a lot. >> Thanks, >> Eila >> >> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <chamik...@google.com >> > wrote: >> >>> Hi Eila, >>> >>> Please find my comments inline. >>> >>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof < >>> e...@orielresearch.org> wrote: >>> >>>> Hello all, >>>> >>>> It was nice to meet you last week!!! >>>> >>>> >>> It was nice to meet you as well :) >>> >>> >>>> I am writing genomic pCollection that is created from bigQuery to a >>>> folder. Following is the code with output so you can run it with any small >>>> BQ table and let me know what your thoughts are: >>>> >>>> This init is only for debugging. In production I will use the pipeline >> syntax >> >>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index': >>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14': >>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}] >>>> >>>> rows[1].keys() >>>> # output: [u'index', u'SNRPCP14'] >>>> >>>> # you can change `archs4.results_20180308_ to any other table name >>>> with index column >>>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io. >>>> BigQuerySource(project='orielresearch-188115', use_standard_sql=False, >>>> query=str('SELECT * FROM `archs4.results_20180308_*` where index=\'%s\'' % >>>> (x["index"])))), >>>> str('gs://archs4/output/'+x[" >>>> index"]+'/'))) >>>> >>> >>> I don't think above code will work (not portable across runners at >>> least). BigQuerySource (along with Read transform) have to be applied to a >>> Pipeline object. So probably change this to a for loop that creates a set >>> of read transforms and use Flatten to create a single PCollection. >>> >> For debug, I am running on the local datalab runner. For the production, >> I will be running only dataflow runner. I think that I was able to query >> the tables that way, I will double check it. The indexes could go to >> millions - my concern is that I will not be able to leverage on Beam >> distribution capability when I use the the loop option. Any thoughts on >> that? >> > > You mean you'll have millions of queries. That will not be scalable. My > suggestion was to loop on queries. Can you reduce to one or a small number > of queries and perform further processing in Beam ? > > >> >>> >>>> >>>> queries2 >>>> # output: a list of pCollection and the path to write the pCollection >>>> data to >>>> >>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>, >>>> 'gs://archs4/output/GSM2313641/'), >>>> (<Read(PTransform) label=[Read] at 0x7fa6990fb950>, >>>> 'gs://archs4/output/GSM2316666/'), >>>> (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>, >>>> 'gs://archs4/output/GSM2312355/'), >>>> (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>, >>>> 'gs://archs4/output/GSM2312372/')] >>>> >>>> >>> What you got here is a PCollection of PTransform objects which is not >>> useful. >>> >>> >>>> >>>> *# this is my challenge* >>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND >>>> COLUMN") >>>> >>>> >>> Once you update above code you will get a proper PCollection of elements >>> read from BigQuery. You can transform and write this (to files, BQ, or any >>> other sink) as needed. >>> >> >> it is a list of tupples with PCollection and the path to write to. the >> path is not unique and I might have more than one PCollection written to >> the same destination. How do I pass the path from the tupple list as a >> parameter to the text file name? Could you please add the code that you >> were thinking about? >> > > Python SDK does not support writing to different files based on the values > of data (dynamic writes). So you'll have to either partition data into > separate PCollections or write all data into the same location. > > Here's *pseudocode* (untested) for reading from few queries, partitioning > into several PCollections, and writing to different destinations. > > *queries = ['select * from A', 'select * from B',....]* > > *p = Pipeline()* > *pcollections = []* > *for query in queries:* > * pc = p | beam.io.Read(beam.io > <http://beam.io/>.BigQuerySource(query=query))* > * pcollections.append(pc)* > > *all_data = pcollections | beam.Flatten()* > *partitions = all_data | beam.Partition(my_partition_fn)* > *for i, partition in enumerate(partitions):* > * partition | beam.io.WriteToText(<unique path for partition i>)* > Hope this helps. > > Thanks, > Cham > > > >> Please see programming guide on how to write to text files (section 5.3 >>> and click Python tab): https://beam.apache.org/ >>> documentation/programming-guide/ >>> >>> Thanks, >>> Cham >>> >>> >>>> Do you have any idea how to sink the data to a text file? I have tried >>>> few other options and was stuck at the write transform >>>> >>>> Any advice is very appreciated. >>>> >>>> Thanks, >>>> Eila >>>> >>>> >>>> >>>> -- >>>> Eila >>>> www.orielresearch.org >>>> https://www.meetup.com/Deep-Learning-In-Production/ >>>> >>> >> >> >> -- >> Eila >> www.orielresearch.org >> https://www.meetup.com/Deep-Learning-In-Production/ >> > -- Eila www.orielresearch.org https://www.meetup.com/Deep-Learning-In-Production/