Copilot commented on code in PR #64051:
URL: https://github.com/apache/airflow/pull/64051#discussion_r3066494627
##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -125,8 +139,32 @@ def execute_complete(self, context: Context, event: Any =
None) -> None:
Relies on trigger to throw an exception, otherwise it assumes
execution was
successful.
"""
+ hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id,
api_version=self.api_version)
if event["status"] == "error":
self.log.debug("Error occurred with context: %s", context)
+ raise RuntimeError(event["message"])
+
+ job_id = event.get("job_id")
+ if event["status"] == "timeout":
+ if job_id:
+ self.log.info("Cancelling Airbyte job %s due to execution
timeout", job_id)
+ try:
+ hook.cancel_job(job_id=job_id)
+ except Exception as e:
+ self.log.warning("Failed to cancel Airbyte job %s: %s",
job_id, e)
+ else:
+ self.log.warning("No job_id found; skipping cancellation")
+
+ raise RuntimeError(event["message"])
+
+ job_id = event.get("job_id")
+ if event["status"] == "timeout":
+ if job_id:
+ self.log.info("Cancelling Airbyte job %s due to execution
timeout", job_id)
+ hook.cancel_job(job_id=job_id)
+ else:
+ self.log.warning("No job_id found; skipping cancellation")
+
Review Comment:
The `timeout` handling block is duplicated (lines 147–158 and 160–168). The
second block is unreachable because the first one always raises, and it also
changes behavior (no try/except + different exception type). Consolidate into a
single `timeout` branch (with the intended best-effort cancellation semantics)
and raise one consistent Airflow exception type for timeouts/errors.
```suggestion
```
##########
providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py:
##########
@@ -38,6 +38,8 @@ class AirbyteSyncTrigger(BaseTrigger):
:param job_id: The ID of an Airbyte Sync job.
:param end_time: Time in seconds to wait for a job run to reach a terminal
status. Defaults to 7 days.
Review Comment:
`end_time` is documented as a duration ('Time in seconds to wait') but the
trigger logic compares it to `time.time()`, implying it is an absolute
timestamp (deadline). Since this PR introduces another absolute deadline
(`execution_deadline`), it’s important to clarify `end_time` as an absolute
timestamp too (or rename/rework it) to avoid API/documentation mismatch.
```suggestion
:param end_time: Absolute timestamp (in seconds since the epoch) by
which the job run
must reach a terminal status. Defaults to 7 days from the trigger
start time.
```
##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -82,7 +87,15 @@ def execute(self, context: Context) -> None:
job_object =
hook.submit_sync_connection(connection_id=self.connection_id)
self.job_id = job_object.job_id
state = job_object.status
+
+ # Derive absolute deadlines for deferrable execution.
+ # execution_timeout is a hard task-level limit (cancels the job),
+ # while timeout only limits how long we wait for the job to finish.
+ # If both are set, the earliest deadline wins.
end_time = time.time() + self.timeout
+ execution_deadline = None
+ if self.execution_timeout:
+ execution_deadline = time.time() +
self.execution_timeout.total_seconds()
Review Comment:
`if self.execution_timeout:` relies on truthiness and will skip deadline
computation for `timedelta(0)` (which is falsy), even though a zero execution
timeout should be enforced. Prefer an explicit `is not None` check, and
consider capturing `now = time.time()` once to avoid drift between `end_time`
and `execution_deadline` computations.
```suggestion
now = time.time()
end_time = now + self.timeout
execution_deadline = None
if self.execution_timeout is not None:
execution_deadline = now + self.execution_timeout.total_seconds()
```
##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -125,8 +139,32 @@ def execute_complete(self, context: Context, event: Any =
None) -> None:
Relies on trigger to throw an exception, otherwise it assumes
execution was
successful.
"""
+ hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id,
api_version=self.api_version)
if event["status"] == "error":
self.log.debug("Error occurred with context: %s", context)
+ raise RuntimeError(event["message"])
Review Comment:
Raising `RuntimeError` from an operator callback is inconsistent with
typical Airflow operator semantics and makes it harder to handle failures
uniformly. Prefer raising an Airflow exception type (e.g., `AirflowException`
for generic errors and `AirflowTaskTimeout` for execution timeouts), and update
the new unit tests accordingly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]