Repository: incubator-airflow Updated Branches: refs/heads/master 67ab416db -> cac133001
[AIRFLOW-667] Handle BigQuery 503 error Closes #1938 from krmettu/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cac13300 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cac13300 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cac13300 Branch: refs/heads/master Commit: cac133001b517dd7d66a31288dd375eb63d8cd26 Parents: 67ab416 Author: Krishnaveni Mettu <[email protected]> Authored: Wed Dec 14 13:02:48 2016 -0800 Committer: Chris Riccomini <[email protected]> Committed: Wed Dec 14 13:02:56 2016 -0800 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 37 +++++++++++++++++++---------- 1 file changed, 24 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cac13300/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index a0cb71d..900ec12 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -470,21 +470,32 @@ class BigQueryBaseCursor(object): .insert(projectId=self.project_id, body=job_data) \ .execute() job_id = query_reply['jobReference']['jobId'] - job = jobs.get(projectId=self.project_id, jobId=job_id).execute() # Wait for query to finish. - while not job['status']['state'] == 'DONE': - logging.info('Waiting for job to complete: %s, %s', self.project_id, job_id) - time.sleep(5) - job = jobs.get(projectId=self.project_id, jobId=job_id).execute() - - # Check if job had errors. - if 'errorResult' in job['status']: - raise Exception( - 'BigQuery job failed. Final error was: {}. The job was: {}'.format( - job['status']['errorResult'], job - ) - ) + keep_polling_job = True + while (keep_polling_job): + try: + job = jobs.get(projectId=self.project_id, jobId=job_id).execute() + if (job['status']['state'] == 'DONE'): + keep_polling_job = False + # Check if job had errors. + if 'errorResult' in job['status']: + raise Exception( + 'BigQuery job failed. Final error was: {}. The job was: {}'.format( + job['status']['errorResult'], job + ) + ) + else: + logging.info('Waiting for job to complete : %s, %s', self.project_id, job_id) + time.sleep(5) + + except HttpError, err: + if err.code in [500, 503]: + logging.info('%s: Retryable error, waiting for job to complete: %s', err.code, job_id) + time.sleep(5) + else: + raise Exception( + 'BigQuery job status check faild. Final error was: %s', err.code) return job_id
