Copilot commented on code in PR #64745:
URL: https://github.com/apache/airflow/pull/64745#discussion_r3041387374
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -222,6 +222,9 @@ def execute(self, context: Context) -> str | None:
if not self.job_id:
raise AirflowException("AWS Batch job - job_id was not found")
+ # Persist operator links before deferring so they're available in
the UI
+ self._persist_links(context)
+
job = self.hook.get_job_description(self.job_id)
job_status = job.get("status")
if job_status == self.hook.SUCCESS_STATE:
Review Comment:
In the deferrable path this adds multiple `describe_jobs` calls in quick
succession: `_persist_links()` calls `get_job_description()` and then
`get_job_all_awslogs_info()` (which calls `get_job_description()` again), and
`execute()` then calls `get_job_description()` again for `job_status`. This
increases AWS API usage and can hit Batch API throttling. Consider refactoring
to reuse a single job description (e.g., have `_persist_links` accept/return
`job_desc`) and/or avoid fetching CloudWatch log info before deferring (it is
often unavailable immediately).
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -403,6 +395,50 @@ def monitor_job(self, context: Context):
aws_partition=self.hook.conn_partition,
**awslogs[0],
)
+ else:
+ # Persist placeholder to prevent "XCom not found" warnings
+ # CloudWatch logs will be updated when job completes
+ context["task_instance"].xcom_push(
+ key="cloudwatch_events",
+ value=None,
+ )
+
+ def monitor_job(self, context: Context):
+ """
+ Monitor an AWS Batch job.
+
+ This can raise an exception or an AirflowTaskTimeout if the task was
+ created with ``execution_timeout``.
+ """
+ if not self.job_id:
+ raise AirflowException("AWS Batch job - job_id was not found")
+
+ # Persist operator links
+ self._persist_links(context)
+
+ if self.awslogs_enabled:
+ if self.waiters:
+ self.waiters.wait_for_job(self.job_id,
get_batch_log_fetcher=self._get_batch_log_fetcher)
+ else:
+ self.hook.wait_for_job(self.job_id,
get_batch_log_fetcher=self._get_batch_log_fetcher)
+ else:
+ if self.waiters:
+ self.waiters.wait_for_job(self.job_id)
+ else:
+ self.hook.wait_for_job(self.job_id)
+
+ # Log all CloudWatch log stream links for user reference
+ try:
+ awslogs = self.hook.get_job_all_awslogs_info(self.job_id)
+ if awslogs:
+ self.log.info(
+ "AWS Batch job (%s) CloudWatch Events details found. Links
to logs:", self.job_id
+ )
+ link_builder = CloudWatchEventsLink()
+ for log in awslogs:
+ self.log.info(link_builder.format_link(**log))
+ except AirflowException as ae:
+ self.log.warning("Cannot determine where to find the AWS logs for
this Batch job: %s", ae)
Review Comment:
`monitor_job()` calls `_persist_links()` (which already calls
`get_job_all_awslogs_info()`), then after waiting it calls
`get_job_all_awslogs_info()` again just to log stream links. This adds extra
`describe_jobs` calls and will break existing expectations in unit tests that
`get_job_description` is called exactly twice during `execute()` (see
`test_execute_without_failures`). Consider fetching/logging/persisting
CloudWatch log info only once (e.g., do it after waiting, like the previous
implementation, or have `_persist_links` optionally skip CloudWatch work when
called pre-wait).
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -257,6 +260,9 @@ def execute_complete(self, context: Context, event:
dict[str, Any] | None = None
self.job_id = validated_event["job_id"]
+ # Persist operator links for UI
+ self._persist_links(context)
Review Comment:
`execute_complete()` now unconditionally calls `_persist_links(context)`,
which performs AWS API calls and assumes the context contains a task instance.
In this repo's unit tests `execute_complete` is invoked with an empty dict
(`context={}`), and `_persist_links` can raise (e.g., when it hits the
CloudWatch placeholder `context["task_instance"].xcom_push`). Either guard
`_persist_links` when required context keys are missing, or update the
tests/mocks to pass a real TI context and stub the hook calls.
```suggestion
# Persist operator links for UI when task instance context is
available.
if context and context.get("task_instance") is not None:
self._persist_links(context)
```
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -403,6 +395,50 @@ def monitor_job(self, context: Context):
aws_partition=self.hook.conn_partition,
**awslogs[0],
)
+ else:
+ # Persist placeholder to prevent "XCom not found" warnings
+ # CloudWatch logs will be updated when job completes
+ context["task_instance"].xcom_push(
+ key="cloudwatch_events",
+ value=None,
+ )
Review Comment:
This manually pushes an XCom placeholder even when `do_xcom_push=False`,
uses a hard-coded key string, and directly indexes `context["task_instance"]`
(KeyError if not present). It also isn’t needed for `BaseAwsLink.get_link()`
since missing/falsey XCom values already render as no link. Prefer removing the
placeholder entirely, or at least respect `operator.do_xcom_push` and use
`CloudWatchEventsLink.key` plus the same `context["ti"]` convention used by
`BaseAwsLink.persist()`.
```suggestion
```
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -368,33 +375,18 @@ def monitor_job(self, context: Context):
job_queue_arn=job_queue_arn,
)
- if self.awslogs_enabled:
- if self.waiters:
- self.waiters.wait_for_job(self.job_id,
get_batch_log_fetcher=self._get_batch_log_fetcher)
- else:
- self.hook.wait_for_job(self.job_id,
get_batch_log_fetcher=self._get_batch_log_fetcher)
- else:
- if self.waiters:
- self.waiters.wait_for_job(self.job_id)
- else:
- self.hook.wait_for_job(self.job_id)
-
+ # Persist CloudWatch logs link if available
awslogs = []
try:
awslogs = self.hook.get_job_all_awslogs_info(self.job_id)
except AirflowException as ae:
- self.log.warning("Cannot determine where to find the AWS logs for
this Batch job: %s", ae)
+ # CloudWatch logs may not be available immediately after job
submission
+ self.log.info("CloudWatch logs not yet available for Batch job:
%s", ae)
Review Comment:
The `except AirflowException` branch assumes the failure to fetch log info
is transient (“not yet available”) and logs at INFO.
`get_job_all_awslogs_info()` can raise `AirflowException` for non-transient
cases too (e.g. unsupported job type), so this message/level is misleading and
can hide real configuration issues. Consider logging a more neutral message (or
warning) and differentiating transient “stream not created yet” (which returns
`[]`) from actual exceptions.
```suggestion
self.log.warning(
"Unable to retrieve CloudWatch log information for AWS Batch
job (%s): %s",
self.job_id,
ae,
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]