Adding dev. I am not sure if this is a dev or user question.
Thanks,
Eila
---------- Forwarded message ----------
From: OrielResearch Eila Arich-Landkof <[email protected]>
Date: Mon, Apr 2, 2018 at 11:33 AM
Subject: H5 potential intermediate solution
To: [email protected]


Hello all,

I would like to try a different way to leverage Apache beam for H5 => BQ
(file to table transfer).

For my use case, I would like to read every 10K rows of H5 data (numpy
array format), transpose them and write them to BQ 10K columns. 10K is BQ
columns limit.

My code is below and fires the following error (I might have missed
something basic). I am not using beam.Create and trying to create a
PCollection from the ParDo transfer. is this posssible? if not, what is the
alternative for creating a PColleciton from numpy array? (if any)

ERROR:root:Exception at bundle
<apache_beam.runners.direct.bundle_factory._Bundle object at
0x7f00aad7b7a0>, due to an exception.
 Traceback (most recent call last):
  File 
"/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
line 307, in call
    side_input_values)
  File 
"/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
line 332, in attempt_call
    evaluator.start_bundle()
  File 
"/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py",
line 540, in start_bundle
    self._applied_ptransform.inputs[0].windowing,
AttributeError: 'PBegin' object has no attribute 'windowing'

ERROR:root:Giving up after 4 attempts.
WARNING:root:A task failed with exception: 'PBegin' object has no
attribute 'windowing'
WARNING:root:A task failed with exception: 'PBegin' object has no
attribute 'windowing'



*Code:*

options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'orielresearch-188115'
google_cloud_options.job_name = 'h5-to-bq-10K'
google_cloud_options.staging_location = 'gs://archs4/staging'
google_cloud_options.temp_location = 'gs://archs4/temp'
options.view_as(StandardOptions).runner = 'DirectRunner'

p = beam.Pipeline(options=options)

class read10kRowsDoFn(beam.DoFn):
  def process(self, element,index):
    print(index)
    row_start = index
    row_end = index+10000
    # returns numpy array - numpy.ndarray
    d = expression[row_start,row_end,:]
    np.transpose(d)
    return(d)

#for i in range(0,expression.shape[0],10000):
k=210 # allows creating unique labels for the runner
for i in range(0,3,2): # test
  k+=1
  bigQuery_dataset_table_name=bigquery_dataset_name+'.'+
bigquery_table_name+str(k)
  print(bigQuery_dataset_table_name)
  label_read_row = "read "+bigQuery_dataset_table_name
  label_write_col = "write "+bigQuery_dataset_table_name
*# is this possible to generate a PCollection with ParDo without create?*
  (p | label_read_row >> beam.ParDo(read10kRowsDoFn(i))
     | label_write_col >> beam.io.Write(beam.io.
BigQuerySink(bigQuery_dataset_table_name)))

p.run().wait_until_finish()* #fires an error*

Many thanks,

-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/



-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/

Reply via email to