very helpful!!! i will keep you posted if I have any issue / question
Best,
Eila


On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <chamik...@google.com>
wrote:

>
>
> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof <
> e...@orielresearch.org> wrote:
>
>> Hi Cham,
>>
>> Please see inline. If possible, code / pseudo code will help a lot.
>> Thanks,
>> Eila
>>
>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <chamik...@google.com
>> > wrote:
>>
>>> Hi Eila,
>>>
>>> Please find my comments inline.
>>>
>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
>>> e...@orielresearch.org> wrote:
>>>
>>>> Hello all,
>>>>
>>>> It was nice to meet you last week!!!
>>>>
>>>>
>>> It was nice to meet you as well :)
>>>
>>>
>>>> I am writing genomic pCollection that is created from bigQuery to a
>>>> folder. Following is the code with output so you can run it with any small
>>>> BQ table and let me know what your thoughts are:
>>>>
>>>> This init is only for debugging. In production I will use the pipeline
>> syntax
>>
>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>>>>
>>>> rows[1].keys()
>>>> # output:  [u'index', u'SNRPCP14']
>>>>
>>>> # you can change `archs4.results_20180308_ to any other table name
>>>> with index column
>>>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io.
>>>> BigQuerySource(project='orielresearch-188115', use_standard_sql=False,
>>>> query=str('SELECT * FROM `archs4.results_20180308_*` where index=\'%s\'' %
>>>> (x["index"])))),
>>>>                                str('gs://archs4/output/'+x["
>>>> index"]+'/')))
>>>>
>>>
>>> I don't think above code will work (not portable across runners at
>>> least). BigQuerySource (along with Read transform) have to be applied to a
>>> Pipeline object. So probably change this to a for loop that creates a set
>>> of read transforms and use Flatten to create a single PCollection.
>>>
>> For debug, I am running on the local datalab runner. For the production,
>> I will be running only dataflow runner. I think that I was able to query
>> the tables that way, I will double check it. The indexes could go to
>> millions - my concern is that I will not be able to leverage on Beam
>> distribution capability when I use the the loop option. Any thoughts on
>> that?
>>
>
> You mean you'll have millions of queries. That will not be scalable. My
> suggestion was to loop on queries. Can you reduce to one or a small number
> of queries and perform further processing in Beam ?
>
>
>>
>>>
>>>>
>>>> queries2
>>>> # output: a list of pCollection and the path to write the pCollection
>>>> data to
>>>>
>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>>>>   'gs://archs4/output/GSM2313641/'),
>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>>>>   'gs://archs4/output/GSM2316666/'),
>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>>>>   'gs://archs4/output/GSM2312355/'),
>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>>>>   'gs://archs4/output/GSM2312372/')]
>>>>
>>>>
>>> What you got here is a PCollection of PTransform objects which is not
>>> useful.
>>>
>>>
>>>>
>>>> *# this is my challenge*
>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND
>>>> COLUMN")
>>>>
>>>>
>>> Once you update above code you will get a proper PCollection of elements
>>> read from BigQuery. You can transform and write this (to files, BQ, or any
>>> other sink) as needed.
>>>
>>
>> it is a list of tupples with PCollection and the path to write to. the
>> path is not unique and I might have more than one PCollection written to
>> the same destination. How do I pass the path from the tupple list as a
>> parameter to the text file name? Could you please add the code that you
>> were thinking about?
>>
>
> Python SDK does not support writing to different files based on the values
> of data (dynamic writes). So you'll have to either partition data into
> separate PCollections  or write all data into the same location.
>
> Here's *pseudocode* (untested) for reading from few queries, partitioning
> into several PCollections, and writing to different destinations.
>
> *queries = ['select * from A', 'select * from B',....]*
>
> *p = Pipeline()*
> *pcollections = []*
> *for query in queries:*
> *  pc = p | beam.io.Read(beam.io
> <http://beam.io/>.BigQuerySource(query=query))*
> * pcollections.append(pc)*
>
> *all_data = pcollections | beam.Flatten()*
> *partitions = all_data | beam.Partition(my_partition_fn)*
> *for i, partition in enumerate(partitions):*
> *  partition | beam.io.WriteToText(<unique path for partition i>)*
> Hope this helps.
>
> Thanks,
> Cham
>
>
>
>> Please see programming guide on how to write to text files (section 5.3
>>> and click Python tab): https://beam.apache.org/
>>> documentation/programming-guide/
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>> Do you have any idea how to sink the data to a text file? I have tried
>>>> few other options and was stuck at the write transform
>>>>
>>>> Any advice is very appreciated.
>>>>
>>>> Thanks,
>>>> Eila
>>>>
>>>>
>>>>
>>>> --
>>>> Eila
>>>> www.orielresearch.org
>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>
>>>
>>
>>
>> --
>> Eila
>> www.orielresearch.org
>> https://www.meetup.com/Deep-Learning-In-Production/
>>
>


-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/

Reply via email to