This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c141383d5bf fix: api_version on on_kill method (#46833)
c141383d5bf is described below
commit c141383d5bf70e59d03d64490f9e68cefcdabda3
Author: Danton Bertuol <[email protected]>
AuthorDate: Thu Feb 20 15:39:11 2025 -0300
fix: api_version on on_kill method (#46833)
---
.../src/airflow/providers/airbyte/operators/airbyte.py | 2 +-
.../tests/unit/airbyte/operators/test_airbyte.py | 17 +++++++++++++++++
2 files changed, 18 insertions(+), 1 deletion(-)
diff --git
a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
index 757de753528..dd494e855ae 100644
--- a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
+++ b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
@@ -133,7 +133,7 @@ class AirbyteTriggerSyncOperator(BaseOperator):
def on_kill(self):
"""Cancel the job if task is cancelled."""
- hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id,
api_type=self.api_type)
+ hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id,
api_version=self.api_version)
if self.job_id:
self.log.info("on_kill: cancel the airbyte Job %s", self.job_id)
hook.cancel_job(self.job_id)
diff --git a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
index ffbddcd7e5b..f13cf2fa30c 100644
--- a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
+++ b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
@@ -69,3 +69,20 @@ class TestAirbyteTriggerSyncOp:
mock_wait_for_job.assert_called_once_with(
job_id=self.job_id, wait_seconds=self.wait_seconds,
timeout=self.timeout
)
+
+
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.cancel_job")
+ def test_on_kill(self, mock_cancel_job):
+ conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte",
host="airbyte.com")
+ db.merge_conn(conn)
+
+ op = AirbyteTriggerSyncOperator(
+ task_id="test_Airbyte_op",
+ airbyte_conn_id=self.airbyte_conn_id,
+ connection_id=self.connection_id,
+ wait_seconds=self.wait_seconds,
+ timeout=self.timeout,
+ )
+ op.job_id = self.job_id
+ op.on_kill()
+
+ mock_cancel_job.assert_called_once_with(self.job_id)