yash1thsa commented on code in PR #55703:
URL: https://github.com/apache/airflow/pull/55703#discussion_r2353160368


##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -250,14 +250,22 @@ def execute(self, context: Context) -> str | None:
 
         return self.job_id
 
-    def execute_complete(self, context: Context, event: dict[str, Any] | None 
= None) -> str:
+    def execute_complete(
+        self, context: Context, event: dict[str, Any] | None = None
+    ) -> str:
         validated_event = validate_execute_complete_event(event)
 
         if validated_event["status"] != "success":
             raise AirflowException(f"Error while running job: 
{validated_event}")
 
-        self.log.info("Job completed.")
-        return validated_event["job_id"]
+        self.job_id = validated_event["job_id"]
+
+        # Fetch logs if awslogs_enabled
+        if self.awslogs_enabled:
+            self.monitor_job(context)  # fetch logs, no need to return

Review Comment:
   @o-nikolas  Thanks for the comment - self.monitor_job should be triggered 
only once through execute_complete instead of execute method. As a result users 
should not be seeing multiple start up comments. Here is a formated version of 
the log , let me know your thoughts (looks similar if deferable is set to 
false) - 
   
   Log message source details
   sources = [
     
"/root/airflow/logs/dag_id=batch_deferred_log/run_id=manual__2025-09-16T16:39:34.512871+00:00/task_id=run_batch_job/attempt=1.log",
     
"/root/airflow/logs/dag_id=batch_deferred_log/run_id=manual__2025-09-16T16:39:34.512871+00:00/task_id=run_batch_job/attempt=1.log.trigger.3.log"
   ]
   
   INFO     DAG bundles loaded: dags-folder
            source=airflow.dag_processing.bundles.manager.DagBundlesManager
            loc=manager.py:179
   
   INFO     Filling up the DagBag from /files/dags/batch_deferred_log.py
            source=airflow.models.dagbag.DagBag
            loc=dagbag.py:593
   
   INFO     Running AWS Batch job - job definition: my-test-definition - on 
queue my-test-queue
            
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
            loc=batch.py:280
   
   INFO     AWS Connection (conn_id='aws_default1', conn_type='aws') 
credentials retrieved from login and password.
            
source=airflow.providers.amazon.aws.utils.connection_wrapper.AwsConnectionWrapper
            loc=connection_wrapper.py:334
   
   INFO     AWS Batch job (d3038fd5-5982-4d1a-bfc3-57a6fe256c7d) started
            details={ ResponseMetadata:{...}, jobArn:..., jobName:..., 
jobId:... }
            
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
            loc=batch.py:327
   
   INFO     Pausing task as DEFERRED
            dag_id=batch_deferred_log
            task_id=run_batch_job
            run_id=manual__2025-09-16T16:39:34.512871+00:00
            source=task
            loc=task_runner.py:844
   
   WARNING  No XCom value found; defaulting to None
            key=batch_job_definition
            dag_id=batch_deferred_log task_id=run_batch_job 
run_id=manual__2025-09-16T16:39:34.512871+00:00 map_index=-1
            source=task
            loc=xcom.py:264
   
   WARNING  No XCom value found; defaulting to None
            key=batch_job_queue
            dag_id=batch_deferred_log task_id=run_batch_job 
run_id=manual__2025-09-16T16:39:34.512871+00:00 map_index=-1
            source=task
            loc=xcom.py:264
   
   WARNING  No XCom value found; defaulting to None
            key=cloudwatch_events
            dag_id=batch_deferred_log task_id=run_batch_job 
run_id=manual__2025-09-16T16:39:34.512871+00:00 map_index=-1
            source=task
            loc=xcom.py:264
   
   INFO     trigger 
batch_deferred_log/manual__2025-09-16T16:39:34.512871+00:00/run_batch_job/-1/1 
(ID 2) starting
            loc=triggerer_job_runner.py:1099
   
   INFO     AWS Connection (conn_id='aws_default1', conn_type='aws') 
credentials retrieved from login and password.
            
source=airflow.providers.amazon.aws.utils.connection_wrapper.AwsConnectionWrapper
            loc=connection_wrapper.py:334
   
   INFO     Batch job d3038fd5-5982-4d1a-bfc3-57a6fe256c7d not ready yet: 
['STARTING']
            source=airflow.providers.amazon.aws.utils.waiter_with_logging
            loc=waiter_with_logging.py:158
   
   INFO     Batch job d3038fd5-5982-4d1a-bfc3-57a6fe256c7d not ready yet: 
['RUNNING']
            source=airflow.providers.amazon.aws.utils.waiter_with_logging
            loc=waiter_with_logging.py:158
   
   INFO     Trigger fired
            event 
name=batch_deferred_log/manual__2025-09-16T16:39:34.512871+00:00/run_batch_job/-1/1
 (ID 2)
            result=TriggerEvent<{'status': 'success', 'job_id': 
'd3038fd5-5982-4d1a-bfc3-57a6fe256c7d'}>
            loc=triggerer_job_runner.py:1102
   
   INFO     trigger completed
            
name=batch_deferred_log/manual__2025-09-16T16:39:34.512871+00:00/run_batch_job/-1/1
 (ID 2)
            loc=triggerer_job_runner.py:1123
   
   INFO     DAG bundles loaded: dags-folder
            source=airflow.dag_processing.bundles.manager.DagBundlesManager
            loc=manager.py:179
   
   INFO     Filling up the DagBag from /files/dags/batch_deferred_log.py
            source=airflow.models.dagbag.DagBag
            loc=dagbag.py:593
   
   INFO     AWS Connection (conn_id='aws_default1', conn_type='aws') 
credentials retrieved from login and password.
            
source=airflow.providers.amazon.aws.utils.connection_wrapper.AwsConnectionWrapper
            loc=connection_wrapper.py:334
   
   INFO     AWS Batch job (d3038fd5-5982-4d1a-bfc3-57a6fe256c7d) Job Definition 
ARN: ...
            Job Queue ARN: ...
            
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
            loc=batch.py:350
   
   INFO     AWS Batch job (...) check status (SUCCEEDED) in 
['RUNNING','SUCCEEDED','FAILED']
            
source=airflow.task.hooks.airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook
            loc=batch_client.py:381
   
   INFO     AWS Batch job (...) check status (SUCCEEDED) in 
['SUCCEEDED','FAILED']
            
source=airflow.task.hooks.airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook
            loc=batch_client.py:381
   
   INFO     AWS Connection (conn_id='aws_default1', conn_type='aws') 
credentials retrieved from login and password.
            
source=airflow.providers.amazon.aws.utils.connection_wrapper.AwsConnectionWrapper
            loc=connection_wrapper.py:334
   
   INFO     [2025-09-16 16:39:58,113] hello world
            
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
            loc=task_log_fetcher.py:75
   
   INFO     AWS Batch job (...) has completed
            
source=airflow.task.hooks.airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook
            loc=batch_client.py:311
   
   INFO     AWS Batch job (...) CloudWatch Events details found. Links to logs:
            
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
            loc=batch.py:392
   
   INFO     (empty log line)
            
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
            loc=batch.py:395
   
   INFO     AWS Batch job (...) succeeded
            details={ jobArn:..., jobName:..., jobId:..., jobQueue:..., 
status:..., attempts:[...], container:{...}, etc. }
            
source=airflow.task.hooks.airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook
            loc=batch_client.py:270
   
   INFO     AWS Batch job (...) succeeded
            
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
            loc=batch.py:411
   
   INFO     Job completed successfully for job_id: 
d3038fd5-5982-4d1a-bfc3-57a6fe256c7d
            
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
            loc=batch.py:267
   
   INFO     Pushing xcom
            ti=RuntimeTaskInstance(...full object...)
            source=task
            loc=task_runner.py:1352
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to