Copilot commented on code in PR #64119:
URL: https://github.com/apache/airflow/pull/64119#discussion_r3025331486


##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -127,7 +127,7 @@ def execute_complete(self, context: Context, event: Any = 
None) -> None:
         """
         if event["status"] == "error":
             self.log.debug("Error occurred with context: %s", context)
-            raise AirflowException(event["message"])
+            raise RuntimeError(event["message"])
 
         self.log.info("%s completed successfully.", self.task_id)
         return None

Review Comment:
   `execute_complete()` treats any non-`"error"` event as success. The trigger 
can emit `{"status": "cancelled", ...}` (see `AirbyteSyncTrigger.run()`), and 
the non-deferrable `execute()` path treats `JobStatusEnum.CANCELLED` as a 
failure (raises). This makes deferrable runs incorrectly succeed on cancelled 
Airbyte jobs. Handle `"cancelled"` (and any unknown status) explicitly—e.g. 
only treat `"success"` as success and raise for `"cancelled"`/`"error"`/unknown.



##########
providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py:
##########
@@ -67,6 +68,52 @@ def test_execute(self, mock_wait_for_job, 
mock_submit_sync_connection, create_co
             job_id=self.job_id, wait_seconds=self.wait_seconds, 
timeout=self.timeout
         )
 
+    @pytest.mark.parametrize("status", ["success", "cancelled"])
+    def test_execute_complete_non_error_states(self, status, 
create_connection_without_db):
+        conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", 
host="airbyte.com")
+        create_connection_without_db(conn)
+
+        op = AirbyteTriggerSyncOperator(
+            task_id="test_airbyte_op",
+            airbyte_conn_id=self.airbyte_conn_id,
+            connection_id=self.connection_id,
+            wait_seconds=self.wait_seconds,
+            timeout=self.timeout,
+            deferrable=True,
+        )
+
+        event = {
+            "status": status,
+            "message": "succeeded/cancelled",
+            "job_id": self.job_id,
+        }
+
+        result = op.execute_complete(context={}, event=event)
+
+        assert result is None

Review Comment:
   This test asserts that `execute_complete()` succeeds when `event["status"] 
== "cancelled"`, but the operator’s `execute()` method treats a cancelled 
Airbyte job as a failure (it raises on `JobStatusEnum.CANCELLED`). The trigger 
also emits `status="cancelled"`. The test should expect an exception for 
`cancelled` (or be removed) and a separate test should assert success only for 
`status="success"`.



##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -127,7 +127,7 @@ def execute_complete(self, context: Context, event: Any = 
None) -> None:
         """
         if event["status"] == "error":
             self.log.debug("Error occurred with context: %s", context)
-            raise AirflowException(event["message"])
+            raise RuntimeError(event["message"])

Review Comment:
   Raising a bare `RuntimeError` here is inconsistent with the rest of this 
operator (which raises `AirflowException` for job failures/cancellation) and 
with common deferrable-operator patterns in Airflow (often a domain-specific 
Airflow exception type). Consider raising `AirflowException` (or a 
provider-specific exception) and update the test accordingly, so task failure 
semantics and exception handling remain consistent for users/operators.
   ```suggestion
               raise AirflowException(event["message"])
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to