shivaam commented on code in PR #64342:
URL: https://github.com/apache/airflow/pull/64342#discussion_r3025601762
##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py:
##########
@@ -87,6 +95,131 @@ def hook(self) -> AwsGenericHook:
config=self.botocore_config,
)
+ async def run(self) -> AsyncIterator[TriggerEvent]:
+ if not self.verbose:
+ async for event in super().run():
+ yield event
+ return
+
+ hook = self.hook()
+ async with (
+ await hook.get_async_conn() as glue_client,
+ await AwsLogsHook(
+ aws_conn_id=self.aws_conn_id, region_name=self.region_name
+ ).get_async_conn() as logs_client,
+ ):
+ # Get log group name from job run metadata and initial state in
one call
+ job_run_resp = await
glue_client.get_job_run(JobName=self.job_name, RunId=self.run_id)
+ log_group_output, log_group_error =
get_glue_log_group_names(job_run_resp["JobRun"])
+
+ output_token: str | None = None
+ error_token: str | None = None
+ job_run_state = job_run_resp["JobRun"]["JobRunState"]
+
+ for _attempt in range(self.attempts):
+ # Fetch and print logs from both output and error streams
+ try:
+ output_token = await self._forward_logs(
+ logs_client, log_group_output, self.run_id,
output_token
+ )
+ error_token = await self._forward_logs(
+ logs_client, log_group_error, self.run_id, error_token
+ )
+ except ClientError as e:
+ self.log.error(
+ "Failed to fetch logs for Glue Job %s Run %s: %s",
+ self.job_name,
+ self.run_id,
+ e,
+ )
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Failed to fetch logs for Glue Job
{self.job_name} Run {self.run_id}: {e}",
+ self.return_key: self.return_value,
+ }
+ )
+ return
+
+ if job_run_state in ("FAILED", "TIMEOUT"):
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Glue Job {self.job_name} Run
{self.run_id}"
+ f" exited with state: {job_run_state}",
+ self.return_key: self.return_value,
+ }
+ )
+ return
+ if job_run_state in ("SUCCEEDED", "STOPPED"):
+ self.log.info(
+ "Exiting Job %s Run %s State: %s",
+ self.job_name,
+ self.run_id,
+ job_run_state,
+ )
+ yield TriggerEvent({"status": "success", self.return_key:
self.return_value})
+ return
+
+ self.log.info(
+ "Polling for AWS Glue Job %s current run state: %s",
+ self.job_name,
+ job_run_state,
+ )
+ await asyncio.sleep(self.waiter_delay)
+
+ # Fetch updated state for next iteration
+ resp = await glue_client.get_job_run(JobName=self.job_name,
RunId=self.run_id)
+ job_run_state = resp["JobRun"]["JobRunState"]
+
Review Comment:
Fixed this by moving the job run state fetch to the top of the loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]