Hi All,

I was able to make it work by creating the PCollection with the numpy
array. However, writing to BQ was impossible because it requested for the
schema.
The code:
(p | "create all" >> beam.Create(expression[1:5,1:5])
   | "write all text" >> beam.io.WriteToText('gs://archs4/output/',
file_name_suffix='.txt'))

*Is there a walk around for providing schema for beam.io
<http://beam.io>.BigQuerySink?*

Many thanks,
Eila

On Mon, Apr 2, 2018 at 11:33 AM, OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> Hello all,
>
> I would like to try a different way to leverage Apache beam for H5 => BQ
> (file to table transfer).
>
> For my use case, I would like to read every 10K rows of H5 data (numpy
> array format), transpose them and write them to BQ 10K columns. 10K is BQ
> columns limit.
>
> My code is below and fires the following error (I might have missed
> something basic). I am not using beam.Create and trying to create a
> PCollection from the ParDo transfer. is this posssible? if not, what is the
> alternative for creating a PColleciton from numpy array? (if any)
>
> ERROR:root:Exception at bundle 
> <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f00aad7b7a0>, 
> due to an exception.
>  Traceback (most recent call last):
>   File 
> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
>  line 307, in call
>     side_input_values)
>   File 
> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
>  line 332, in attempt_call
>     evaluator.start_bundle()
>   File 
> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py",
>  line 540, in start_bundle
>     self._applied_ptransform.inputs[0].windowing,
> AttributeError: 'PBegin' object has no attribute 'windowing'
>
> ERROR:root:Giving up after 4 attempts.
> WARNING:root:A task failed with exception: 'PBegin' object has no attribute 
> 'windowing'
> WARNING:root:A task failed with exception: 'PBegin' object has no attribute 
> 'windowing'
>
>
>
> *Code:*
>
> options = PipelineOptions()
> google_cloud_options = options.view_as(GoogleCloudOptions)
> google_cloud_options.project = 'orielresearch-188115'
> google_cloud_options.job_name = 'h5-to-bq-10K'
> google_cloud_options.staging_location = 'gs://archs4/staging'
> google_cloud_options.temp_location = 'gs://archs4/temp'
> options.view_as(StandardOptions).runner = 'DirectRunner'
>
> p = beam.Pipeline(options=options)
>
> class read10kRowsDoFn(beam.DoFn):
>   def process(self, element,index):
>     print(index)
>     row_start = index
>     row_end = index+10000
>     # returns numpy array - numpy.ndarray
>     d = expression[row_start,row_end,:]
>     np.transpose(d)
>     return(d)
>
> #for i in range(0,expression.shape[0],10000):
> k=210 # allows creating unique labels for the runner
> for i in range(0,3,2): # test
>   k+=1
>   bigQuery_dataset_table_name=bigquery_dataset_name+'.'+
> bigquery_table_name+str(k)
>   print(bigQuery_dataset_table_name)
>   label_read_row = "read "+bigQuery_dataset_table_name
>   label_write_col = "write "+bigQuery_dataset_table_name
> *# is this possible to generate a PCollection with ParDo without create?*
>   (p | label_read_row >> beam.ParDo(read10kRowsDoFn(i))
>      | label_write_col >> beam.io.Write(beam.io.
> BigQuerySink(bigQuery_dataset_table_name)))
>
> p.run().wait_until_finish()* #fires an error*
>
> Many thanks,
>
> --
> Eila
> www.orielresearch.org
> https://www.meetup.com/Deep-Learning-In-Production/
>



-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/

Reply via email to