shivaam commented on code in PR #63086:
URL: https://github.com/apache/airflow/pull/63086#discussion_r2961225721


##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py:
##########
@@ -87,6 +96,108 @@ def hook(self) -> AwsGenericHook:
             config=self.botocore_config,
         )
 
+    async def run(self) -> AsyncIterator[TriggerEvent]:
+        if not self.verbose:
+            async for event in super().run():
+                yield event
+            return
+
+        hook = self.hook()
+        async with (
+            await hook.get_async_conn() as glue_client,
+            await AwsLogsHook(
+                aws_conn_id=self.aws_conn_id, region_name=self.region_name
+            ).get_async_conn() as logs_client,
+        ):
+            # Get log group name from job run metadata and initial state in 
one call
+            job_run_resp = await 
glue_client.get_job_run(JobName=self.job_name, RunId=self.run_id)
+            log_group_prefix = job_run_resp["JobRun"].get("LogGroupName", 
"/aws-glue/jobs")
+            log_group_output = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
+            log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
+
+            output_token: str | None = None
+            error_token: str | None = None
+            job_run_state = job_run_resp["JobRun"]["JobRunState"]
+
+            for _attempt in range(self.attempts):
+                # Fetch and print logs from both output and error streams
+                output_token = await self._forward_logs(
+                    logs_client, log_group_output, self.run_id, output_token
+                )
+                error_token = await self._forward_logs(logs_client, 
log_group_error, self.run_id, error_token)
+
+                if job_run_state in ("FAILED", "TIMEOUT"):
+                    raise AirflowException(
+                        f"Glue Job {self.job_name} Run {self.run_id} exited 
with state: {job_run_state}"
+                    )
+                if job_run_state in ("SUCCEEDED", "STOPPED"):
+                    self.log.info(
+                        "Exiting Job %s Run %s State: %s",
+                        self.job_name,
+                        self.run_id,
+                        job_run_state,
+                    )
+                    yield TriggerEvent({"status": "success", self.return_key: 
self.return_value})
+                    return
+
+                self.log.info(
+                    "Polling for AWS Glue Job %s current run state: %s",
+                    self.job_name,
+                    job_run_state,
+                )
+                await asyncio.sleep(self.waiter_delay)
+
+                # Fetch updated state for next iteration
+                resp = await glue_client.get_job_run(JobName=self.job_name, 
RunId=self.run_id)
+                job_run_state = resp["JobRun"]["JobRunState"]
+
+            raise AirflowException(
+                f"Glue Job {self.job_name} Run {self.run_id} waiter exceeded 
max attempts ({self.attempts})"
+            )
+
+    async def _forward_logs(
+        self,
+        logs_client: Any,
+        log_group: str,
+        log_stream: str,
+        next_token: str | None,
+    ) -> str | None:
+        # Matches the format used by the synchronous 
GlueJobHook.print_job_logs.

Review Comment:
   @Ech-Charay Thanks for the detailed review! A few thoughts:
   
   Option A — the trigger has its own polling loop and never calls 
async_job_completion(), so this refactor doesn't directly apply here. Also, 
since trigger logs are already routed to the task logs in real-time, collecting 
them into a dict and returning at the end would actually delay them reaching 
the user which in my opinion wont be a good UX. Happy to discuss further though!
   
   raise AirflowException — totally agree this is wrong. When the trigger 
raises instead of yielding a TriggerEvent, execute_complete never runs so the 
worker can't fetch the detailed Glue error reason. I'll fix this in a follow-up 
PR. The trigger should yield TriggerEvent({"status": "error", ...}) and let 
execute_complete handle it.
   
   Option B — I didnt deep dive but this seems like a good idea, though it 
would require deprecating public APIs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to