Repository: incubator-airflow Updated Branches: refs/heads/master 85127769d -> 15feb7dd3
[AIRFLOW-1683] Cancel BigQuery job on timeout. This change causes the BigQuery job to be canceled when the task that started it is killed, for example on execution timeout, reducing wasted resources. Closes #2665 from janczak10/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15feb7dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15feb7dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15feb7dd Branch: refs/heads/master Commit: 15feb7dd3f39ba7926ae5817d488e4e54a3d7742 Parents: 8512776 Author: Maria Janczak <[email protected]> Authored: Mon Oct 16 13:45:17 2017 -0700 Committer: Chris Riccomini <[email protected]> Committed: Mon Oct 16 13:45:26 2017 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 54 +++++++++++++++++++-- airflow/contrib/operators/bigquery_operator.py | 21 +++++--- tests/contrib/hooks/test_bigquery_hook.py | 28 +++++++++++ 3 files changed, 92 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15feb7dd/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 5fc7e22..2cfef57 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -185,6 +185,7 @@ class BigQueryBaseCursor(LoggingMixin): def __init__(self, service, project_id): self.service = service self.project_id = project_id + self.running_job_id = None def run_query( self, bql, destination_dataset_table = False, @@ -559,13 +560,13 @@ class BigQueryBaseCursor(LoggingMixin): query_reply = jobs \ .insert(projectId=self.project_id, body=job_data) \ .execute() - job_id = query_reply['jobReference']['jobId'] + self.running_job_id = query_reply['jobReference']['jobId'] # Wait for query to finish. keep_polling_job = True while (keep_polling_job): try: - job = jobs.get(projectId=self.project_id, jobId=job_id).execute() + job = jobs.get(projectId=self.project_id, jobId=self.running_job_id).execute() if (job['status']['state'] == 'DONE'): keep_polling_job = False # Check if job had errors. @@ -576,18 +577,61 @@ class BigQueryBaseCursor(LoggingMixin): ) ) else: - self.log.info('Waiting for job to complete : %s, %s', self.project_id, job_id) + self.log.info('Waiting for job to complete : %s, %s', self.project_id, self.running_job_id) time.sleep(5) except HttpError as err: if err.resp.status in [500, 503]: - self.log.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id) + self.log.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, self.running_job_id) time.sleep(5) else: raise Exception( 'BigQuery job status check failed. Final error was: %s', err.resp.status) - return job_id + return self.running_job_id + + def poll_job_complete(self, job_id): + jobs = self.service.jobs() + try: + job = jobs.get(projectId=self.project_id, jobId=job_id).execute() + if (job['status']['state'] == 'DONE'): + return True + except HttpError as err: + if err.resp.status in [500, 503]: + self.log.info('%s: Retryable error while polling job with id %s', err.resp.status, job_id) + else: + raise Exception( + 'BigQuery job status check failed. Final error was: %s', err.resp.status) + return False + + + def cancel_query(self): + """ + Cancel all started queries that have not yet completed + """ + jobs = self.service.jobs() + if (self.running_job_id and not self.poll_job_complete(self.running_job_id)): + self.log.info('Attempting to cancel job : %s, %s', self.project_id, self.running_job_id) + jobs.cancel(projectId=self.project_id, jobId=self.running_job_id).execute() + else: + self.log.info('No running BigQuery jobs to cancel.') + return + + # Wait for all the calls to cancel to finish + max_polling_attempts = 12 + polling_attempts = 0 + + job_complete = False + while (polling_attempts < max_polling_attempts and not job_complete): + polling_attempts = polling_attempts+1 + job_complete = self.poll_job_complete(self.running_job_id) + if (job_complete): + self.log.info('Job successfully canceled: %s, %s', self.project_id, self.running_job_id) + elif(polling_attempts == max_polling_attempts): + self.log.info('Stopping polling due to timeout. Job with id %s has not completed cancel and may or may not finish.', self.running_job_id) + else: + self.log.info('Waiting for canceled job with id %s to finish.', self.running_job_id) + time.sleep(5) def get_schema(self, dataset_id, table_id): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15feb7dd/airflow/contrib/operators/bigquery_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index a2ba824..19efa55 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -85,14 +85,23 @@ class BigQueryOperator(BaseOperator): self.use_legacy_sql = use_legacy_sql self.maximum_billing_tier = maximum_billing_tier self.query_params = query_params + self.bq_cursor = None def execute(self, context): - self.log.info('Executing: %s', self.bql) - hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, - delegate_to=self.delegate_to) - conn = hook.get_conn() - cursor = conn.cursor() - cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition, + if(self.bq_cursor == None): + self.log.info('Executing: %s', self.bql) + hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, + delegate_to=self.delegate_to) + conn = hook.get_conn() + self.bq_cursor = conn.cursor() + self.bq_cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition, self.allow_large_results, self.udf_config, self.use_legacy_sql, self.maximum_billing_tier, self.create_disposition, self.query_params) + + + def on_kill(self): + super(BigQueryOperator, self).on_kill() + if(self.bq_cursor!=None): + self.log.info('Canceling running query due to execution timeout') + self.bq_cursor.cancel_query() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15feb7dd/tests/contrib/hooks/test_bigquery_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py index 2fe9ddb..0365bba 100644 --- a/tests/contrib/hooks/test_bigquery_hook.py +++ b/tests/contrib/hooks/test_bigquery_hook.py @@ -14,6 +14,7 @@ # import unittest +import mock from airflow.contrib.hooks import bigquery_hook as hook from oauth2client.contrib.gce import HttpAccessTokenRefreshError @@ -163,6 +164,14 @@ class TestBigQueryHookSourceFormat(unittest.TestCase): # since we passed 'json' in, and it's not valid, make sure it's present in the error string. self.assertIn("JSON", str(context.exception)) +# Helpers to test_cancel_queries that have mock_poll_job_complete returning false, unless mock_job_cancel was called with the same job_id +mock_canceled_jobs = [] +def mock_poll_job_complete(job_id): + return job_id in mock_canceled_jobs + +def mock_job_cancel(projectId, jobId): + mock_canceled_jobs.append(jobId) + return mock.Mock() class TestBigQueryBaseCursor(unittest.TestCase): def test_invalid_schema_update_options(self): @@ -185,6 +194,25 @@ class TestBigQueryBaseCursor(unittest.TestCase): write_disposition='WRITE_EMPTY' ) self.assertIn("schema_update_options is only", str(context.exception)) + + @mock.patch("airflow.contrib.hooks.bigquery_hook.LoggingMixin") + @mock.patch("airflow.contrib.hooks.bigquery_hook.time") + def test_cancel_queries(self, mocked_logging, mocked_time): + project_id = 12345 + running_job_id = 3 + + mock_jobs = mock.Mock() + mock_jobs.cancel = mock.Mock(side_effect=mock_job_cancel) + mock_service = mock.Mock() + mock_service.jobs = mock.Mock(return_value=mock_jobs) + + bq_hook = hook.BigQueryBaseCursor(mock_service, project_id) + bq_hook.running_job_id = running_job_id + bq_hook.poll_job_complete = mock.Mock(side_effect=mock_poll_job_complete) + + bq_hook.cancel_query() + + mock_jobs.cancel.assert_called_with(projectId=project_id, jobId=running_job_id) if __name__ == '__main__': unittest.main()
