Copilot commented on code in PR #64119:
URL: https://github.com/apache/airflow/pull/64119#discussion_r3025331486
##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -127,7 +127,7 @@ def execute_complete(self, context: Context, event: Any =
None) -> None:
"""
if event["status"] == "error":
self.log.debug("Error occurred with context: %s", context)
- raise AirflowException(event["message"])
+ raise RuntimeError(event["message"])
self.log.info("%s completed successfully.", self.task_id)
return None
Review Comment:
`execute_complete()` treats any non-`"error"` event as success. The trigger
can emit `{"status": "cancelled", ...}` (see `AirbyteSyncTrigger.run()`), and
the non-deferrable `execute()` path treats `JobStatusEnum.CANCELLED` as a
failure (raises). This makes deferrable runs incorrectly succeed on cancelled
Airbyte jobs. Handle `"cancelled"` (and any unknown status) explicitly—e.g.
only treat `"success"` as success and raise for `"cancelled"`/`"error"`/unknown.
##########
providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py:
##########
@@ -67,6 +68,52 @@ def test_execute(self, mock_wait_for_job,
mock_submit_sync_connection, create_co
job_id=self.job_id, wait_seconds=self.wait_seconds,
timeout=self.timeout
)
+ @pytest.mark.parametrize("status", ["success", "cancelled"])
+ def test_execute_complete_non_error_states(self, status,
create_connection_without_db):
+ conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte",
host="airbyte.com")
+ create_connection_without_db(conn)
+
+ op = AirbyteTriggerSyncOperator(
+ task_id="test_airbyte_op",
+ airbyte_conn_id=self.airbyte_conn_id,
+ connection_id=self.connection_id,
+ wait_seconds=self.wait_seconds,
+ timeout=self.timeout,
+ deferrable=True,
+ )
+
+ event = {
+ "status": status,
+ "message": "succeeded/cancelled",
+ "job_id": self.job_id,
+ }
+
+ result = op.execute_complete(context={}, event=event)
+
+ assert result is None
Review Comment:
This test asserts that `execute_complete()` succeeds when `event["status"]
== "cancelled"`, but the operator’s `execute()` method treats a cancelled
Airbyte job as a failure (it raises on `JobStatusEnum.CANCELLED`). The trigger
also emits `status="cancelled"`. The test should expect an exception for
`cancelled` (or be removed) and a separate test should assert success only for
`status="success"`.
##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -127,7 +127,7 @@ def execute_complete(self, context: Context, event: Any =
None) -> None:
"""
if event["status"] == "error":
self.log.debug("Error occurred with context: %s", context)
- raise AirflowException(event["message"])
+ raise RuntimeError(event["message"])
Review Comment:
Raising a bare `RuntimeError` here is inconsistent with the rest of this
operator (which raises `AirflowException` for job failures/cancellation) and
with common deferrable-operator patterns in Airflow (often a domain-specific
Airflow exception type). Consider raising `AirflowException` (or a
provider-specific exception) and update the test accordingly, so task failure
semantics and exception handling remain consistent for users/operators.
```suggestion
raise AirflowException(event["message"])
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]