kakatur commented on code in PR #64745:
URL: https://github.com/apache/airflow/pull/64745#discussion_r3068197538
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -330,18 +337,38 @@ def submit_job(self, context: Context):
job_id=self.job_id,
)
- def monitor_job(self, context: Context):
+ def _persist_links(
+ self, context: Context, skip_cloudwatch: bool = False,
job_description: dict | None = None
+ ) -> dict:
"""
- Monitor an AWS Batch job.
+ Persist operator links for UI display.
- This can raise an exception or an AirflowTaskTimeout if the task was
- created with ``execution_timeout``.
+ This method retrieves job details and persists the operator links
+ (job definition, job queue, CloudWatch logs) as XCom values so they
+ can be rendered in the Airflow UI.
+
+ :param context: Task context
+ :param skip_cloudwatch: If True, skip fetching CloudWatch logs (useful
before deferring)
+ :param job_description: Optional pre-fetched job description to avoid
redundant API calls
+ :return: Job description dict
"""
if not self.job_id:
raise AirflowException("AWS Batch job - job_id was not found")
+ # Fetch job description (needed for return value and link persistence)
+ try:
+ job_desc = job_description or
self.hook.get_job_description(self.job_id)
+ except KeyError:
+ self.log.warning("AWS Batch job (%s) description not available",
self.job_id)
+ return {}
Review Comment:
Removed dead except KeyError block (lines 359-362).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]