Miles McCain created BEAM-12659:
-----------------------------------
Summary: WriteToBigQuery in BundleBasedDirectRunner fails when
method is FILE_LOADS
Key: BEAM-12659
URL: https://issues.apache.org/jira/browse/BEAM-12659
Project: Beam
Issue Type: Bug
Components: io-py-gcp, runner-py-direct
Affects Versions: 2.31.0
Environment: Ubuntu 20.04 (running inside Docker, python:3.8-slim)
Reporter: Miles McCain
`WriteToBigQuery` fails when using the `FILE_LOADS` method in the
`BundleBasedDirectRunner`.
The issue appears to be in `wait_for_bq_job`, where the function expects
`job_reference` to be an actual JobReference instance and not a string.
However, the `WaitForBQJobs` DoFn appears to be [passing a
string](https://github.com/apache/beam/blob/5a029fd97d663e19a9bcd6bff61648bccbd7f95b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L730)
as the argument. I believe this is during the copy step, and I'm not calling
this code directly (so unfortunately I can't just pass a TableReference
instance myself).
Here is a traceback:
```
request_worker_1 | ERROR:root:Traceback (most recent call last):
request_worker_1 | File "/app/main.py", line 209, in process_message
request_worker_1 | construct_and_run_pipeline(request)
request_worker_1 | File "/app/main.py", line 190, in
construct_and_run_pipeline
request_worker_1 | return result.wait_until_finish()
request_worker_1 | File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py",
line 588, in wait_until_finish
request_worker_1 | self._executor.await_completion()
request_worker_1 | File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py",
line 433, in await_completion
request_worker_1 | self._executor.await_completion()
request_worker_1 | File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py",
line 482, in await_completion
request_worker_1 | raise t(v).with_traceback(tb)
request_worker_1 | File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py",
line 371, in call
request_worker_1 | self.attempt_call(
request_worker_1 | File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py",
line 414, in attempt_call
request_worker_1 | evaluator.process_element(value)
request_worker_1 | File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/transform_evaluator.py",
line 880, in process_element
request_worker_1 | self.runner.process(element)
request_worker_1 | File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line
1225, in process
request_worker_1 | self._reraise_augmented(exn)
request_worker_1 | File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line
1306, in _reraise_augmented
request_worker_1 | raise new_exn.with_traceback(tb)
request_worker_1 | File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line
1223, in process
request_worker_1 | return
self.do_fn_invoker.invoke_process(windowed_value)
request_worker_1 | File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line
752, in invoke_process
request_worker_1 | self._invoke_process_per_window(
request_worker_1 | File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line
877, in _invoke_process_per_window
request_worker_1 | self.process_method(*args_for_process),
request_worker_1 | File
"/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_file_loads.py",
line 730, in process
request_worker_1 | self.bq_wrapper.wait_for_bq_job(ref,
sleep_duration_sec=10, max_retries=0)
request_worker_1 | File
"/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py",
line 562, in wait_for_bq_job
request_worker_1 | job_reference.projectId, job_reference.jobId,
job_reference.location)
request_worker_1 | AttributeError: 'str' object has no attribute
'projectId' [while running 'write tweets to
bigquery/Write/BigQueryBatchFileLoads/WaitForTempTableLoadJobs']
```
Here is the `WriteToBigQuery` step that is failing (note that the callable
passed for `table` returns a TableReference instance):
```python
WriteToBigQuery(
table=lambda row:
bigquery_tools.parse_table_reference(row["table_name"]),
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
ignore_insert_ids=True,
# method="STREAMING_INSERTS", # using STREAMING_INSERTS
'fixes' the issue
batch_size=int(os.getenv("BIGQUERY_BATCH_SIZE", "10")),
schema=schema,
)
```
Note that this issue does not occur when using the standard `DirectRunner`, nor
does it occur when using the `STREAMING_INSERTS` method.
Thanks! (And apologies if I left out any important information. This is the
first issue I've opened here.)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)