kakatur commented on code in PR #64745:
URL: https://github.com/apache/airflow/pull/64745#discussion_r3293396221
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -252,14 +258,17 @@ def execute(self, context: Context) -> str | None:
def execute_complete(self, context: Context, event: dict[str, Any] | None
= None) -> str:
validated_event = validate_execute_complete_event(event)
+ # Set job_id first so CloudWatch link can be persisted even on failure
+ self.job_id = validated_event["job_id"]
+
+ # Persist CloudWatch logs for both success and failure
+ self._persist_cloudwatch_link(context)
+
if validated_event["status"] != "success":
raise AirflowException(f"Error while running job:
{validated_event}")
- self.job_id = validated_event["job_id"]
-
- # Fetch logs if awslogs_enabled
- if self.awslogs_enabled:
- self.monitor_job(context) # fetch logs, no need to return
+ # Check job success (already know status is "success" from above)
+ self.hook.check_job_success(self.job_id)
self.log.info("Job completed successfully for job_id: %s", self.job_id)
return self.job_id
Review Comment:
Restore CloudWatch log fetching in deferrable mode when awslogs_enabled=True
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]