damccorm opened a new issue, #21061:
URL: https://github.com/apache/beam/issues/21061

   `WriteToBigQuery` fails when using the `FILE_LOADS` method in the 
`BundleBasedDirectRunner`.
   
   The issue appears to be in `wait_for_bq_job`, where the function expects 
`job_reference` to be an actual JobReference instance and not a string. 
However, the `WaitForBQJobs` DoFn appears to be [passing a 
string]([https://github.com/apache/beam/blob/5a029fd97d663e19a9bcd6bff61648bccbd7f95b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L730](https://github.com/apache/beam/blob/5a029fd97d663e19a9bcd6bff61648bccbd7f95b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L730))
 as the argument. I believe this is during the copy step, and I'm not calling 
this code directly (so unfortunately I can't just pass a TableReference 
instance myself).
   
   Here is a traceback:
   
    
   ```
   
   request_worker_1      | ERROR:root:Traceback (most recent call last):
   request_worker_1      |   File
   "/app/main.py", line 209, in process_message
   request_worker_1      |     construct_and_run_pipeline(request)
   request_worker_1
        |   File "/app/main.py", line 190, in construct_and_run_pipeline
   request_worker_1      |     return
   result.wait_until_finish()
   request_worker_1      |   File 
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py",
   line 588, in wait_until_finish
   request_worker_1      |     self._executor.await_completion()
   request_worker_1
        |   File 
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py",
 line
   433, in await_completion
   request_worker_1      |     self._executor.await_completion()
   request_worker_1
        |   File 
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py",
 line
   482, in await_completion
   request_worker_1      |     raise t(v).with_traceback(tb)
   request_worker_1
        |   File 
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py",
 line
   371, in call
   request_worker_1      |     self.attempt_call(
   request_worker_1      |   File 
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py",
   line 414, in attempt_call
   request_worker_1      |     evaluator.process_element(value)
   request_worker_1
        |   File 
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/transform_evaluator.py",
   line 880, in process_element
   request_worker_1      |     self.runner.process(element)
   request_worker_1
        |   File 
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 
1225, in
   process
   request_worker_1      |     self._reraise_augmented(exn)
   request_worker_1      |   File 
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py",
   line 1306, in _reraise_augmented
   request_worker_1      |     raise new_exn.with_traceback(tb)
   request_worker_1
        |   File 
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 
1223, in
   process
   request_worker_1      |     return 
self.do_fn_invoker.invoke_process(windowed_value)
   request_worker_1
        |   File 
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 
752, in invoke_process
   request_worker_1
        |     self._invoke_process_per_window(
   request_worker_1      |   File 
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py",
   line 877, in _invoke_process_per_window
   request_worker_1      |     self.process_method(*args_for_process),
   request_worker_1
        |   File 
"/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_file_loads.py",
 line
   730, in process
   request_worker_1      |     self.bq_wrapper.wait_for_bq_job(ref, 
sleep_duration_sec=10,
   max_retries=0)
   request_worker_1      |   File 
"/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py",
   line 562, in wait_for_bq_job
   request_worker_1      |     job_reference.projectId, job_reference.jobId,
   job_reference.location)
   request_worker_1      | AttributeError: 'str' object has no attribute 
'projectId'
   [while running 'write tweets to 
bigquery/Write/BigQueryBatchFileLoads/WaitForTempTableLoadJobs']
   
   ```
   
    
   
   Here is the `WriteToBigQuery` step that is failing (note that the callable 
passed for `table` returns a TableReference instance):
   ```
   
   WriteToBigQuery(
        table=lambda row: 
bigquery_tools.parse_table_reference(row["table_name"]),
   
       create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
   
       ignore_insert_ids=True,
        method="FILE_LOADS", # using STREAMING_INSERTS 'fixes' the issue
   
       batch_size=int(os.getenv("BIGQUERY_BATCH_SIZE", "10")),
        schema=schema,
   )
   
   ```
   
    
   
   Note that this issue does not occur when using the standard `DirectRunner`, 
nor does it occur when using the `STREAMING_INSERTS` method.
   
   Thanks! (And apologies if I left out any important information. This is the 
first issue I've opened here.)
   
   Imported from Jira 
[BEAM-12659](https://issues.apache.org/jira/browse/BEAM-12659). Original Jira 
may contain additional context.
   Reported by: milesmcc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to