eladshabi opened a new issue #16288:
URL: https://github.com/apache/airflow/issues/16288


   Hi,
   
   When using the Bigquery operator on Cloud Composer, and the query takes more 
than 1 hour, we get an "[Errno 32] Broken pipe" after getting a 401 error.
   
   The 401 error appeared 1 hour after the task was created, and it looks the 
root cause is an expired API token.
   
   Important to note that the BQ job itself keeps running, although the airflow 
task failed.
   
   Please look at relevant log parts from the moment that the job BQ job was 
triggered (irrelevant information was removed):
   
   ```
   [2021-05-27 02:30:26,577] {base_task_runner.py:98} INFO - Subtask: 
[2021-05-27 02:30:26,576] {discovery.py:852} INFO - URL being requested: GET 
https://bigquery.googleapis.com/bigquery/v2/projects/<PROJECT-ID>/jobs/<BQ-JOB-ID>?alt=json
   
   [2021-05-27 02:30:26,633] {base_task_runner.py:98} INFO - Subtask: 
[2021-05-27 02:30:26,633] {bigquery_hook.py:856} INFO - Waiting for job to 
complete : <PROJECT-ID>, <BQ-JOB-ID>
   .
   .
   .
   [2021-05-27 03:25:09,683] {base_task_runner.py:98} INFO - Subtask: 
[2021-05-27 03:25:09,682] {discovery.py:852} INFO - URL being requested: GET 
https://bigquery.googleapis.com/bigquery/v2/projects/<PROJECT-ID>/jobs/<BQ-JOB-ID>?alt=json
   [2021-05-27 03:25:09,711] {base_task_runner.py:98} INFO - Subtask: 
[2021-05-27 03:25:09,689] {client.py:631} INFO - Refreshing due to a 401 
(attempt 1/2)
   [2021-05-27 03:25:09,858] {base_task_runner.py:98} INFO - Subtask: 
[2021-05-27 03:25:09,858] {sendgrid.py:126} INFO - Email with subject Airflow 
alert: <...> [failed]> is successfully sent to recipients: <...>
   [2021-05-27 03:25:09,898] {base_task_runner.py:98} INFO - Subtask: Traceback 
(most recent call last):
   [2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask:   File 
"/usr/local/bin/airflow", line 7, in <module>
   [2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask:     
exec(compile(f.read(), __file__, 'exec'))
   [2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask:   File 
"/usr/local/lib/airflow/airflow/bin/airflow", line 27, in <module>
   [2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask:     
args.func(args)
   [2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask:   File 
"/usr/local/lib/airflow/airflow/bin/cli.py", line 392, in run
   [2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:     
pool=args.pool,
   [2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:   File 
"/usr/local/lib/airflow/airflow/utils/db.py", line 50, in wrapper
   [2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:     
result = func(*args, **kwargs)
   [2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:   File 
"/usr/local/lib/airflow/airflow/models.py", line 1492, in _run_raw_task
   [2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:     
result = task_copy.execute(context=context)
   [2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:   File 
"/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py", line 
98, in execute
   [2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:     
self.create_disposition, self.query_params)
   [2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:   File 
"/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 499, in 
run_query
   [2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:     
return self.run_with_configuration(configuration)
   [2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:   File 
"/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 846, in 
run_with_configuration
   [2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:     
jobId=self.running_job_id).execute()
   [2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:   File 
"/usr/local/lib/python2.7/dist-packages/oauth2client/util.py", line 135, in 
positional_wrapper
   [2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask:     
return wrapped(*args, **kwargs)
   [2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask:   File 
"/usr/local/lib/python2.7/dist-packages/googleapiclient/http.py", line 833, in 
execute
   [2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask:     
method=str(self.method), body=self.body, headers=self.headers)
   [2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask:   File 
"/usr/local/lib/python2.7/dist-packages/googleapiclient/http.py", line 173, in 
_retry_request
   [2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask:     raise 
exception
   [2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask: 
socket.error: [Errno 32] Broken pipe
   ```
   
   Cloud Composer version - composer-1.7.2, airflow version - 1.9.0.
   
   Thanks
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to