Copilot commented on code in PR #64745:
URL: https://github.com/apache/airflow/pull/64745#discussion_r3068421599


##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -404,6 +418,33 @@ def monitor_job(self, context: Context):
                 **awslogs[0],
             )
 
+    def monitor_job(self, context: Context):
+        """
+        Monitor an AWS Batch job.
+
+        This can raise an exception or an AirflowTaskTimeout if the task was
+        created with ``execution_timeout``.
+        """
+        if not self.job_id:
+            raise AirflowException("AWS Batch job - job_id was not found")
+
+        # Persist job definition and queue links
+        self._persist_links(context)
+
+        if self.awslogs_enabled:
+            if self.waiters:
+                self.waiters.wait_for_job(self.job_id, 
get_batch_log_fetcher=self._get_batch_log_fetcher)
+            else:
+                self.hook.wait_for_job(self.job_id, 
get_batch_log_fetcher=self._get_batch_log_fetcher)
+        else:
+            if self.waiters:
+                self.waiters.wait_for_job(self.job_id)
+            else:
+                self.hook.wait_for_job(self.job_id)
+
+        # After job completes, persist CloudWatch logs
+        self._persist_cloudwatch_link(context)

Review Comment:
   `monitor_job()` now persists the CloudWatch link unconditionally after 
waiting. This means even when `awslogs_enabled=False` the operator will still 
call `get_job_all_awslogs_info()` (extra `describe_jobs` call + potential 
warnings from the hook about missing streams / non-aws log drivers). If 
`awslogs_enabled` is intended to disable CloudWatch log integration, consider 
gating `_persist_cloudwatch_link()` behind that flag (or introduce a separate 
flag for UI link persistence) to avoid unexpected API usage/noisy logs.
   ```suggestion
           # After job completes, persist CloudWatch logs when log integration 
is enabled.
           if self.awslogs_enabled:
               self._persist_cloudwatch_link(context)
   ```



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -368,33 +384,31 @@ def monitor_job(self, context: Context):
                 job_queue_arn=job_queue_arn,
             )
 
-        if self.awslogs_enabled:
-            if self.waiters:
-                self.waiters.wait_for_job(self.job_id, 
get_batch_log_fetcher=self._get_batch_log_fetcher)
-            else:
-                self.hook.wait_for_job(self.job_id, 
get_batch_log_fetcher=self._get_batch_log_fetcher)
-        else:
-            if self.waiters:
-                self.waiters.wait_for_job(self.job_id)
-            else:
-                self.hook.wait_for_job(self.job_id)
+        return job_desc
+
+    def _persist_cloudwatch_link(self, context: Context) -> None:
+        """
+        Persist CloudWatch logs link if available.
+
+        :param context: Task context
+        """
+        if not self.do_xcom_push:
+            return
 

Review Comment:
   `_persist_cloudwatch_link()` makes AWS API calls 
(`get_job_all_awslogs_info`) even when the task context can’t persist operator 
links (e.g. `context` is `{}` or missing `"ti"`). Since `BaseAwsLink.persist()` 
requires `context["ti"]` (and will no-op via `return_on_error`), consider an 
early return when `not context` or `"ti" not in context` to avoid unnecessary 
Batch `describe_jobs` calls and to keep unit tests that call 
`execute_complete(context={...})` from needing to stub AWS hook methods.
   ```suggestion
   
           if not context or "ti" not in context:
               return
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to