[
https://issues.apache.org/jira/browse/BEAM-12659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17413413#comment-17413413
]
Chamikara Madhusanka Jayalath commented on BEAM-12659:
------------------------------------------------------
Thanks. We try to address user issues as soon as possible but there's certainly
room to improve. Specially when it comes to the open Jira backlog.
For the future, please consider emailing Beam user (or dev) lists for a faster
response. To be honest, most people do not actively monitor Jira issues but you
can expect a faster response in user/dev lists.
If you have a GCP support contract, that should have a faster response SLO as
well.
In any case, this sounds like a real bug. So thanks for reporting it :).
> WriteToBigQuery in BundleBasedDirectRunner fails when method is FILE_LOADS
> --------------------------------------------------------------------------
>
> Key: BEAM-12659
> URL: https://issues.apache.org/jira/browse/BEAM-12659
> Project: Beam
> Issue Type: Bug
> Components: io-py-gcp, runner-py-direct
> Affects Versions: 2.31.0
> Environment: Ubuntu 20.04 (running inside Docker, python:3.8-slim)
> Reporter: Miles McCain
> Assignee: Chamikara Madhusanka Jayalath
> Priority: P2
> Labels: GCP, newbie
>
> `WriteToBigQuery` fails when using the `FILE_LOADS` method in the
> `BundleBasedDirectRunner`.
> The issue appears to be in `wait_for_bq_job`, where the function expects
> `job_reference` to be an actual JobReference instance and not a string.
> However, the `WaitForBQJobs` DoFn appears to be [passing a
> string]([https://github.com/apache/beam/blob/5a029fd97d663e19a9bcd6bff61648bccbd7f95b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L730])
> as the argument. I believe this is during the copy step, and I'm not calling
> this code directly (so unfortunately I can't just pass a TableReference
> instance myself).
> Here is a traceback:
>
> {code:java}
> request_worker_1 | ERROR:root:Traceback (most recent call last):
> request_worker_1 | File "/app/main.py", line 209, in process_message
> request_worker_1 | construct_and_run_pipeline(request)
> request_worker_1 | File "/app/main.py", line 190, in
> construct_and_run_pipeline
> request_worker_1 | return result.wait_until_finish()
> request_worker_1 | File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py",
> line 588, in wait_until_finish
> request_worker_1 | self._executor.await_completion()
> request_worker_1 | File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py",
> line 433, in await_completion
> request_worker_1 | self._executor.await_completion()
> request_worker_1 | File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py",
> line 482, in await_completion
> request_worker_1 | raise t(v).with_traceback(tb)
> request_worker_1 | File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py",
> line 371, in call
> request_worker_1 | self.attempt_call(
> request_worker_1 | File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py",
> line 414, in attempt_call
> request_worker_1 | evaluator.process_element(value)
> request_worker_1 | File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/transform_evaluator.py",
> line 880, in process_element
> request_worker_1 | self.runner.process(element)
> request_worker_1 | File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line
> 1225, in process
> request_worker_1 | self._reraise_augmented(exn)
> request_worker_1 | File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line
> 1306, in _reraise_augmented
> request_worker_1 | raise new_exn.with_traceback(tb)
> request_worker_1 | File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line
> 1223, in process
> request_worker_1 | return
> self.do_fn_invoker.invoke_process(windowed_value)
> request_worker_1 | File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line
> 752, in invoke_process
> request_worker_1 | self._invoke_process_per_window(
> request_worker_1 | File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line
> 877, in _invoke_process_per_window
> request_worker_1 | self.process_method(*args_for_process),
> request_worker_1 | File
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_file_loads.py",
> line 730, in process
> request_worker_1 | self.bq_wrapper.wait_for_bq_job(ref,
> sleep_duration_sec=10, max_retries=0)
> request_worker_1 | File
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py",
> line 562, in wait_for_bq_job
> request_worker_1 | job_reference.projectId, job_reference.jobId,
> job_reference.location)
> request_worker_1 | AttributeError: 'str' object has no attribute
> 'projectId' [while running 'write tweets to
> bigquery/Write/BigQueryBatchFileLoads/WaitForTempTableLoadJobs']
> {code}
>
> Here is the `WriteToBigQuery` step that is failing (note that the callable
> passed for `table` returns a TableReference instance):
> {code:java}
> WriteToBigQuery(
> table=lambda row:
> bigquery_tools.parse_table_reference(row["table_name"]),
> create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
> write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
> ignore_insert_ids=True,
> method="FILE_LOADS", # using STREAMING_INSERTS 'fixes' the issue
> batch_size=int(os.getenv("BIGQUERY_BATCH_SIZE", "10")),
> schema=schema,
> )
> {code}
>
> Note that this issue does not occur when using the standard `DirectRunner`,
> nor does it occur when using the `STREAMING_INSERTS` method.
> Thanks! (And apologies if I left out any important information. This is the
> first issue I've opened here.)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)