This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e84d753015 Fix BigQueryInsertJobOperator cancel_on_kill (#25342)
e84d753015 is described below
commit e84d753015e5606c29537741cdbe8ae08012c3b6
Author: Dalei Li <[email protected]>
AuthorDate: Thu Aug 4 16:23:23 2022 +0200
Fix BigQueryInsertJobOperator cancel_on_kill (#25342)
---
airflow/providers/google/cloud/hooks/bigquery.py | 2 +-
.../providers/google/cloud/operators/bigquery.py | 18 +++++----
.../google/cloud/operators/test_bigquery.py | 46 +++++++++++++++++++++-
3 files changed, 55 insertions(+), 11 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py
b/airflow/providers/google/cloud/hooks/bigquery.py
index 0049143aea..1e9848d767 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -1415,7 +1415,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
location: Optional[str] = None,
) -> None:
"""
- Cancels a job an wait for cancellation to complete
+ Cancel a job and wait for cancellation to complete
:param job_id: id of the job.
:param project_id: Google Cloud Project where the job is running
diff --git a/airflow/providers/google/cloud/operators/bigquery.py
b/airflow/providers/google/cloud/operators/bigquery.py
index 827f82f5f6..3107aaf183 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -2143,7 +2143,7 @@ class BigQueryInsertJobOperator(BaseOperator):
hook: BigQueryHook,
job_id: str,
) -> BigQueryJob:
- # Submit a new job and wait for it to complete and get the result.
+ # Submit a new job without waiting for it to complete.
return hook.insert_job(
configuration=self.configuration,
project_id=self.project_id,
@@ -2151,6 +2151,7 @@ class BigQueryInsertJobOperator(BaseOperator):
job_id=job_id,
timeout=self.result_timeout,
retry=self.result_retry,
+ nowait=True,
)
@staticmethod
@@ -2178,7 +2179,6 @@ class BigQueryInsertJobOperator(BaseOperator):
try:
self.log.info("Executing: %s'", self.configuration)
job = self._submit_job(hook, job_id)
- self._handle_job_error(job)
except Conflict:
# If the job already exists retrieve it
job = hook.get_job(
@@ -2186,11 +2186,7 @@ class BigQueryInsertJobOperator(BaseOperator):
location=self.location,
job_id=job_id,
)
- if job.state in self.reattach_states:
- # We are reattaching to a job
- job.result(timeout=self.result_timeout,
retry=self.result_retry)
- self._handle_job_error(job)
- else:
+ if job.state not in self.reattach_states:
# Same job configuration so we need force_rerun
raise AirflowException(
f"Job with id: {job_id} already exists and is in
{job.state} state. If you "
@@ -2225,10 +2221,16 @@ class BigQueryInsertJobOperator(BaseOperator):
BigQueryTableLink.persist(**persist_kwargs)
self.job_id = job.job_id
- return job.job_id
+ # Wait for the job to complete
+ job.result(timeout=self.result_timeout, retry=self.result_retry)
+ self._handle_job_error(job)
+
+ return self.job_id
def on_kill(self) -> None:
if self.job_id and self.cancel_on_kill:
self.hook.cancel_job( # type: ignore[union-attr]
job_id=self.job_id, project_id=self.project_id,
location=self.location
)
+ else:
+ self.log.info('Skipping to cancel job: %s:%s.%s', self.project_id,
self.location, self.job_id)
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py
b/tests/providers/google/cloud/operators/test_bigquery.py
index 7be855a9a0..494749282b 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -23,7 +23,7 @@ import pytest
from google.cloud.bigquery import DEFAULT_RETRY
from google.cloud.exceptions import Conflict
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCheckOperator,
BigQueryConsoleIndexableLink,
@@ -830,6 +830,7 @@ class TestBigQueryInsertJobOperator:
configuration=configuration,
location=TEST_DATASET_LOCATION,
job_id=real_job_id,
+ nowait=True,
project_id=TEST_GCP_PROJECT_ID,
retry=DEFAULT_RETRY,
timeout=None,
@@ -870,6 +871,7 @@ class TestBigQueryInsertJobOperator:
configuration=configuration,
location=TEST_DATASET_LOCATION,
job_id=real_job_id,
+ nowait=True,
project_id=TEST_GCP_PROJECT_ID,
retry=DEFAULT_RETRY,
timeout=None,
@@ -913,6 +915,45 @@ class TestBigQueryInsertJobOperator:
project_id=TEST_GCP_PROJECT_ID,
)
+
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
+ @mock.patch('airflow.providers.google.cloud.hooks.bigquery.BigQueryJob')
+ def test_on_kill_after_execution_timeout(self, mock_job, mock_hook):
+ job_id = "123456"
+ hash_ = "hash"
+ real_job_id = f"{job_id}_{hash_}"
+
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ }
+ }
+
+ mock_job.job_id = real_job_id
+ mock_job.error_result = False
+ mock_job.result.side_effect = AirflowTaskTimeout()
+
+ mock_hook.return_value.insert_job.return_value = mock_job
+ mock_hook.return_value.generate_job_id.return_value = real_job_id
+
+ op = BigQueryInsertJobOperator(
+ task_id="insert_query_job",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ job_id=job_id,
+ project_id=TEST_GCP_PROJECT_ID,
+ cancel_on_kill=True,
+ )
+ with pytest.raises(AirflowTaskTimeout):
+ op.execute(context=MagicMock())
+
+ op.on_kill()
+ mock_hook.return_value.cancel_job.assert_called_once_with(
+ job_id=real_job_id,
+ location=TEST_DATASET_LOCATION,
+ project_id=TEST_GCP_PROJECT_ID,
+ )
+
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
def test_execute_failure(self, mock_hook):
job_id = "123456"
@@ -1018,6 +1059,7 @@ class TestBigQueryInsertJobOperator:
configuration=configuration,
location=TEST_DATASET_LOCATION,
job_id=real_job_id,
+ nowait=True,
project_id=TEST_GCP_PROJECT_ID,
retry=DEFAULT_RETRY,
timeout=None,
@@ -1038,7 +1080,7 @@ class TestBigQueryInsertJobOperator:
}
}
- mock_hook.return_value.insert_job.return_value.result.side_effect =
Conflict("any")
+ mock_hook.return_value.insert_job.side_effect = Conflict("any")
mock_hook.return_value.generate_job_id.return_value = real_job_id
job = MagicMock(
job_id=real_job_id,