yash1thsa commented on code in PR #55703:
URL: https://github.com/apache/airflow/pull/55703#discussion_r2353160368
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -250,14 +250,22 @@ def execute(self, context: Context) -> str | None:
return self.job_id
- def execute_complete(self, context: Context, event: dict[str, Any] | None
= None) -> str:
+ def execute_complete(
+ self, context: Context, event: dict[str, Any] | None = None
+ ) -> str:
validated_event = validate_execute_complete_event(event)
if validated_event["status"] != "success":
raise AirflowException(f"Error while running job:
{validated_event}")
- self.log.info("Job completed.")
- return validated_event["job_id"]
+ self.job_id = validated_event["job_id"]
+
+ # Fetch logs if awslogs_enabled
+ if self.awslogs_enabled:
+ self.monitor_job(context) # fetch logs, no need to return
Review Comment:
@o-nikolas Thanks for the comment - self.monitor_job should be triggered
only once through execute_complete instead of execute method. As a result users
should not be seeing multiple start up comments. Here is a formated version of
the log , let me know your thoughts (looks similar if deferable is set to
false) -
Log message source details
sources = [
"/root/airflow/logs/dag_id=batch_deferred_log/run_id=manual__2025-09-16T16:39:34.512871+00:00/task_id=run_batch_job/attempt=1.log",
"/root/airflow/logs/dag_id=batch_deferred_log/run_id=manual__2025-09-16T16:39:34.512871+00:00/task_id=run_batch_job/attempt=1.log.trigger.3.log"
]
INFO DAG bundles loaded: dags-folder
source=airflow.dag_processing.bundles.manager.DagBundlesManager
loc=manager.py:179
INFO Filling up the DagBag from /files/dags/batch_deferred_log.py
source=airflow.models.dagbag.DagBag
loc=dagbag.py:593
INFO Running AWS Batch job - job definition: my-test-definition - on
queue my-test-queue
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
loc=batch.py:280
INFO AWS Connection (conn_id='aws_default1', conn_type='aws')
credentials retrieved from login and password.
source=airflow.providers.amazon.aws.utils.connection_wrapper.AwsConnectionWrapper
loc=connection_wrapper.py:334
INFO AWS Batch job (d3038fd5-5982-4d1a-bfc3-57a6fe256c7d) started
details={ ResponseMetadata:{...}, jobArn:..., jobName:...,
jobId:... }
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
loc=batch.py:327
INFO Pausing task as DEFERRED
dag_id=batch_deferred_log
task_id=run_batch_job
run_id=manual__2025-09-16T16:39:34.512871+00:00
source=task
loc=task_runner.py:844
WARNING No XCom value found; defaulting to None
key=batch_job_definition
dag_id=batch_deferred_log task_id=run_batch_job
run_id=manual__2025-09-16T16:39:34.512871+00:00 map_index=-1
source=task
loc=xcom.py:264
WARNING No XCom value found; defaulting to None
key=batch_job_queue
dag_id=batch_deferred_log task_id=run_batch_job
run_id=manual__2025-09-16T16:39:34.512871+00:00 map_index=-1
source=task
loc=xcom.py:264
WARNING No XCom value found; defaulting to None
key=cloudwatch_events
dag_id=batch_deferred_log task_id=run_batch_job
run_id=manual__2025-09-16T16:39:34.512871+00:00 map_index=-1
source=task
loc=xcom.py:264
INFO trigger
batch_deferred_log/manual__2025-09-16T16:39:34.512871+00:00/run_batch_job/-1/1
(ID 2) starting
loc=triggerer_job_runner.py:1099
INFO AWS Connection (conn_id='aws_default1', conn_type='aws')
credentials retrieved from login and password.
source=airflow.providers.amazon.aws.utils.connection_wrapper.AwsConnectionWrapper
loc=connection_wrapper.py:334
INFO Batch job d3038fd5-5982-4d1a-bfc3-57a6fe256c7d not ready yet:
['STARTING']
source=airflow.providers.amazon.aws.utils.waiter_with_logging
loc=waiter_with_logging.py:158
INFO Batch job d3038fd5-5982-4d1a-bfc3-57a6fe256c7d not ready yet:
['RUNNING']
source=airflow.providers.amazon.aws.utils.waiter_with_logging
loc=waiter_with_logging.py:158
INFO Trigger fired
event
name=batch_deferred_log/manual__2025-09-16T16:39:34.512871+00:00/run_batch_job/-1/1
(ID 2)
result=TriggerEvent<{'status': 'success', 'job_id':
'd3038fd5-5982-4d1a-bfc3-57a6fe256c7d'}>
loc=triggerer_job_runner.py:1102
INFO trigger completed
name=batch_deferred_log/manual__2025-09-16T16:39:34.512871+00:00/run_batch_job/-1/1
(ID 2)
loc=triggerer_job_runner.py:1123
INFO DAG bundles loaded: dags-folder
source=airflow.dag_processing.bundles.manager.DagBundlesManager
loc=manager.py:179
INFO Filling up the DagBag from /files/dags/batch_deferred_log.py
source=airflow.models.dagbag.DagBag
loc=dagbag.py:593
INFO AWS Connection (conn_id='aws_default1', conn_type='aws')
credentials retrieved from login and password.
source=airflow.providers.amazon.aws.utils.connection_wrapper.AwsConnectionWrapper
loc=connection_wrapper.py:334
INFO AWS Batch job (d3038fd5-5982-4d1a-bfc3-57a6fe256c7d) Job Definition
ARN: ...
Job Queue ARN: ...
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
loc=batch.py:350
INFO AWS Batch job (...) check status (SUCCEEDED) in
['RUNNING','SUCCEEDED','FAILED']
source=airflow.task.hooks.airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook
loc=batch_client.py:381
INFO AWS Batch job (...) check status (SUCCEEDED) in
['SUCCEEDED','FAILED']
source=airflow.task.hooks.airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook
loc=batch_client.py:381
INFO AWS Connection (conn_id='aws_default1', conn_type='aws')
credentials retrieved from login and password.
source=airflow.providers.amazon.aws.utils.connection_wrapper.AwsConnectionWrapper
loc=connection_wrapper.py:334
INFO [2025-09-16 16:39:58,113] hello world
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
loc=task_log_fetcher.py:75
INFO AWS Batch job (...) has completed
source=airflow.task.hooks.airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook
loc=batch_client.py:311
INFO AWS Batch job (...) CloudWatch Events details found. Links to logs:
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
loc=batch.py:392
INFO (empty log line)
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
loc=batch.py:395
INFO AWS Batch job (...) succeeded
details={ jobArn:..., jobName:..., jobId:..., jobQueue:...,
status:..., attempts:[...], container:{...}, etc. }
source=airflow.task.hooks.airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook
loc=batch_client.py:270
INFO AWS Batch job (...) succeeded
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
loc=batch.py:411
INFO Job completed successfully for job_id:
d3038fd5-5982-4d1a-bfc3-57a6fe256c7d
source=airflow.task.operators.airflow.providers.amazon.aws.operators.batch.BatchOperator
loc=batch.py:267
INFO Pushing xcom
ti=RuntimeTaskInstance(...full object...)
source=task
loc=task_runner.py:1352
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]