[ 
https://issues.apache.org/jira/browse/BEAM-8452?focusedWorklogId=340785&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340785
 ]

ASF GitHub Bot logged work on BEAM-8452:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Nov/19 22:25
            Start Date: 08/Nov/19 22:25
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on pull request #10000: BEAM-8452 
- TriggerLoadJobs.process in bigquery_file_loads schema is type str
URL: https://github.com/apache/beam/pull/10000#discussion_r344390667
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
 ##########
 @@ -387,6 +387,14 @@ def process(self, element, load_job_name_prefix, 
*schema_side_inputs):
     else:
       schema = self.schema
 
+    import unicode
+    import json
+
+    if isinstance(schema, (str, unicode)):
+      schema = bigquery_tools.parse_table_schema_from_json(schema)
+    elif isinstance(schema, dict):
 
 Review comment:
   Thanks. Can you please add a unit test.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 340785)
    Time Spent: 1.5h  (was: 1h 20m)

> TriggerLoadJobs.process in bigquery_file_loads schema is type str
> -----------------------------------------------------------------
>
>                 Key: BEAM-8452
>                 URL: https://issues.apache.org/jira/browse/BEAM-8452
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.15.0, 2.16.0
>            Reporter: Noah Goodrich
>            Assignee: Noah Goodrich
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
>  I've found a first issue with the BigQueryFileLoads Transform and the type 
> of the schema parameter.
> {code:java}
> Triggering job 
> beam_load_2019_10_11_140829_19_157670e4d458f0ff578fbe971a91b30a_1570802915 to 
> load data to BigQuery table <TableReference
>  datasetId: 'pyr_monat_dev'
>  projectId: 'icentris-ml-dev'
>  tableId: 'tree_user_types'>.Schema: {"fields": [{"name": "id", "type": 
> "INTEGER", "mode": "required"}, {"name": "description", "type": "STRING", 
> "mode": "nullable"}]}. Additional parameters: {}
> Retry with exponential backoff: waiting for 4.875033410381894 seconds before 
> retrying _insert_load_job because we caught exception: 
> apitools.base.protorpclite.messages.ValidationError: Expected type <clas
> s 
> 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableSchema'>
>  for field schema, found {"fields": [{"name": "id", "type": "INTEGER", 
> "mode": "required"}, {"name": "description", "type"
> : "STRING", "mode": "nullable"}]} (type <class 'str'>)
>  Traceback for above exception (most recent call last):
>   File "/opt/conda/lib/python3.7/site-packages/apache_beam/utils/retry.py", 
> line 206, in wrapper
>     return fun(*args, **kwargs)
>   File 
> "/opt/conda/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py",
>  line 344, in _insert_load_job
>     **additional_load_parameters
>   File 
> "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py",
>  line 791, in __init__
>     setattr(self, name, value)
>   File 
> "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py",
>  line 973, in __setattr__
>     object.__setattr__(self, name, value)
>   File 
> "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py",
>  line 1652, in __set__
>     super(MessageField, self).__set__(message_instance, value)
>   File 
> "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py",
>  line 1293, in __set__
>     value = self.validate(value)
>   File 
> "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py",
>  line 1400, in validate
>     return self.__validate(value, self.validate_element)
>   File 
> "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py",
>  line 1358, in __validate
>     return validate_element(value)   
>   File 
> "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py",
>  line 1340, in validate_element
>     (self.type, name, value, type(value)))
>  
> {code}
>  
> The triggering code looks like this:
>  
> options.view_as(DebugOptions).experiments = ['use_beam_bq_sink']
>         # Save main session state so pickled functions and classes
>         # defined in __main__ can be unpickled
>         options.view_as(SetupOptions).save_main_session = True
>         custom_options = options.view_as(LoadSqlToBqOptions)
>         with beam.Pipeline(options=options) as p:
>             (p
>                 | "Initializing with empty collection" >> beam.Create([1])
>                 | "Reading records from CloudSql" >> 
> beam.ParDo(ReadFromRelationalDBFn(
>                     username=custom_options.user,
>                     password=custom_options.password,
>                     database=custom_options.database,
>                     table=custom_options.table,
>                     key_field=custom_options.key_field,
>                     batch_size=custom_options.batch_size))
>                 | "Converting Row Object for BigQuery" >> 
> beam.ParDo(BuildForBigQueryFn(custom_options.bq_schema))
>                 | "Writing to BigQuery" >> beam.io.WriteToBigQuery(
>                         table=custom_options.bq_table,
>                         schema=custom_options.bq_schema,
>                         
> write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
>                         
> create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to