This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c118b2836f `AirbyteHook` add cancel job option (#24593)
c118b2836f is described below
commit c118b2836f7211a0c3762cff8634b7b9a0d1cf0b
Author: sivankumar86 <[email protected]>
AuthorDate: Wed Jun 29 16:43:53 2022 +1000
`AirbyteHook` add cancel job option (#24593)
---
airflow/providers/airbyte/hooks/airbyte.py | 12 ++++++++++++
airflow/providers/airbyte/operators/airbyte.py | 22 ++++++++++++++--------
tests/providers/airbyte/hooks/test_airbyte.py | 9 +++++++++
3 files changed, 35 insertions(+), 8 deletions(-)
diff --git a/airflow/providers/airbyte/hooks/airbyte.py
b/airflow/providers/airbyte/hooks/airbyte.py
index b1f6317530..ab0d7e4baf 100644
--- a/airflow/providers/airbyte/hooks/airbyte.py
+++ b/airflow/providers/airbyte/hooks/airbyte.py
@@ -107,6 +107,18 @@ class AirbyteHook(HttpHook):
headers={"accept": "application/json"},
)
+ def cancel_job(self, job_id: int) -> Any:
+ """
+ Cancel the job when task is cancelled
+
+ :param job_id: Required. Id of the Airbyte job
+ """
+ return self.run(
+ endpoint=f"api/{self.api_version}/jobs/cancel",
+ json={"id": job_id},
+ headers={"accept": "application/json"},
+ )
+
def test_connection(self):
"""Tests the Airbyte connection by hitting the health API"""
self.method = 'GET'
diff --git a/airflow/providers/airbyte/operators/airbyte.py
b/airflow/providers/airbyte/operators/airbyte.py
index ef2e2c1559..7677795a6a 100644
--- a/airflow/providers/airbyte/operators/airbyte.py
+++ b/airflow/providers/airbyte/operators/airbyte.py
@@ -67,14 +67,20 @@ class AirbyteTriggerSyncOperator(BaseOperator):
def execute(self, context: 'Context') -> None:
"""Create Airbyte Job and wait to finish"""
- hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id,
api_version=self.api_version)
- job_object =
hook.submit_sync_connection(connection_id=self.connection_id)
- job_id = job_object.json()['job']['id']
+ self.hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id,
api_version=self.api_version)
+ job_object =
self.hook.submit_sync_connection(connection_id=self.connection_id)
+ self.job_id = job_object.json()['job']['id']
- self.log.info("Job %s was submitted to Airbyte Server", job_id)
+ self.log.info("Job %s was submitted to Airbyte Server", self.job_id)
if not self.asynchronous:
- self.log.info('Waiting for job %s to complete', job_id)
- hook.wait_for_job(job_id=job_id, wait_seconds=self.wait_seconds,
timeout=self.timeout)
- self.log.info('Job %s completed successfully', job_id)
+ self.log.info('Waiting for job %s to complete', self.job_id)
+ self.hook.wait_for_job(job_id=self.job_id,
wait_seconds=self.wait_seconds, timeout=self.timeout)
+ self.log.info('Job %s completed successfully', self.job_id)
- return job_id
+ return self.job_id
+
+ def on_kill(self):
+ """Cancel the job if task is cancelled"""
+ if self.job_id:
+ self.log.info('on_kill: cancel the airbyte Job %s', self.job_id)
+ self.hook.cancel_job(self.job_id)
diff --git a/tests/providers/airbyte/hooks/test_airbyte.py
b/tests/providers/airbyte/hooks/test_airbyte.py
index 77432cdec4..31b1d0ea81 100644
--- a/tests/providers/airbyte/hooks/test_airbyte.py
+++ b/tests/providers/airbyte/hooks/test_airbyte.py
@@ -38,9 +38,12 @@ class TestAirbyteHook(unittest.TestCase):
job_id = 1
sync_connection_endpoint =
'http://test-airbyte:8001/api/v1/connections/sync'
get_job_endpoint = 'http://test-airbyte:8001/api/v1/jobs/get'
+ cancel_job_endpoint = 'http://test-airbyte:8001/api/v1/jobs/cancel'
+
health_endpoint = 'http://test-airbyte:8001/api/v1/health'
_mock_sync_conn_success_response_body = {'job': {'id': 1}}
_mock_job_status_success_response_body = {'job': {'status': 'succeeded'}}
+ _mock_job_cancel_status = 'cancelled'
def setUp(self):
db.merge_conn(
@@ -71,6 +74,12 @@ class TestAirbyteHook(unittest.TestCase):
assert resp.status_code == 200
assert resp.json() == self._mock_job_status_success_response_body
+ @requests_mock.mock()
+ def test_cancel_job(self, m):
+ m.post(self.cancel_job_endpoint, status_code=200,
json=self._mock_job_status_success_response_body)
+ resp = self.hook.cancel_job(job_id=self.job_id)
+ assert resp.status_code == 200
+
@mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
def test_wait_for_job_succeeded(self, mock_get_job):
mock_get_job.side_effect =
[self.return_value_get_job(self.hook.SUCCEEDED)]