eladshabi opened a new issue #16288:
URL: https://github.com/apache/airflow/issues/16288
Hi,
When using the Bigquery operator on Cloud Composer, and the query takes more
than 1 hour, we get an "[Errno 32] Broken pipe" after getting a 401 error.
The 401 error appeared 1 hour after the task was created, and it looks the
root cause is an expired API token.
Important to note that the BQ job itself keeps running, although the airflow
task failed.
Please look at relevant log parts from the moment that the job BQ job was
triggered (irrelevant information was removed):
```
[2021-05-27 02:30:26,577] {base_task_runner.py:98} INFO - Subtask:
[2021-05-27 02:30:26,576] {discovery.py:852} INFO - URL being requested: GET
https://bigquery.googleapis.com/bigquery/v2/projects/<PROJECT-ID>/jobs/<BQ-JOB-ID>?alt=json
[2021-05-27 02:30:26,633] {base_task_runner.py:98} INFO - Subtask:
[2021-05-27 02:30:26,633] {bigquery_hook.py:856} INFO - Waiting for job to
complete : <PROJECT-ID>, <BQ-JOB-ID>
.
.
.
[2021-05-27 03:25:09,683] {base_task_runner.py:98} INFO - Subtask:
[2021-05-27 03:25:09,682] {discovery.py:852} INFO - URL being requested: GET
https://bigquery.googleapis.com/bigquery/v2/projects/<PROJECT-ID>/jobs/<BQ-JOB-ID>?alt=json
[2021-05-27 03:25:09,711] {base_task_runner.py:98} INFO - Subtask:
[2021-05-27 03:25:09,689] {client.py:631} INFO - Refreshing due to a 401
(attempt 1/2)
[2021-05-27 03:25:09,858] {base_task_runner.py:98} INFO - Subtask:
[2021-05-27 03:25:09,858] {sendgrid.py:126} INFO - Email with subject Airflow
alert: <...> [failed]> is successfully sent to recipients: <...>
[2021-05-27 03:25:09,898] {base_task_runner.py:98} INFO - Subtask: Traceback
(most recent call last):
[2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask: File
"/usr/local/bin/airflow", line 7, in <module>
[2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask:
exec(compile(f.read(), __file__, 'exec'))
[2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask: File
"/usr/local/lib/airflow/airflow/bin/airflow", line 27, in <module>
[2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask:
args.func(args)
[2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask: File
"/usr/local/lib/airflow/airflow/bin/cli.py", line 392, in run
[2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:
pool=args.pool,
[2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask: File
"/usr/local/lib/airflow/airflow/utils/db.py", line 50, in wrapper
[2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:
result = func(*args, **kwargs)
[2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask: File
"/usr/local/lib/airflow/airflow/models.py", line 1492, in _run_raw_task
[2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:
result = task_copy.execute(context=context)
[2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask: File
"/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py", line
98, in execute
[2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:
self.create_disposition, self.query_params)
[2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask: File
"/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 499, in
run_query
[2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:
return self.run_with_configuration(configuration)
[2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask: File
"/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 846, in
run_with_configuration
[2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:
jobId=self.running_job_id).execute()
[2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask: File
"/usr/local/lib/python2.7/dist-packages/oauth2client/util.py", line 135, in
positional_wrapper
[2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask:
return wrapped(*args, **kwargs)
[2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask: File
"/usr/local/lib/python2.7/dist-packages/googleapiclient/http.py", line 833, in
execute
[2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask:
method=str(self.method), body=self.body, headers=self.headers)
[2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask: File
"/usr/local/lib/python2.7/dist-packages/googleapiclient/http.py", line 173, in
_retry_request
[2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask: raise
exception
[2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask:
socket.error: [Errno 32] Broken pipe
```
Cloud Composer version - composer-1.7.2, airflow version - 1.9.0.
Thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]