o-nikolas commented on code in PR #63086:
URL: https://github.com/apache/airflow/pull/63086#discussion_r2926357704


##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py:
##########
@@ -87,6 +96,108 @@ def hook(self) -> AwsGenericHook:
             config=self.botocore_config,
         )
 
+    async def run(self) -> AsyncIterator[TriggerEvent]:
+        if not self.verbose:
+            async for event in super().run():
+                yield event
+            return
+
+        hook = self.hook()
+        async with (
+            await hook.get_async_conn() as glue_client,
+            await AwsLogsHook(
+                aws_conn_id=self.aws_conn_id, region_name=self.region_name
+            ).get_async_conn() as logs_client,
+        ):
+            # Get log group name from job run metadata and initial state in 
one call
+            job_run_resp = await 
glue_client.get_job_run(JobName=self.job_name, RunId=self.run_id)
+            log_group_prefix = job_run_resp["JobRun"].get("LogGroupName", 
"/aws-glue/jobs")
+            log_group_output = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
+            log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
+
+            output_token: str | None = None
+            error_token: str | None = None
+            job_run_state = job_run_resp["JobRun"]["JobRunState"]
+
+            for _attempt in range(self.attempts):
+                # Fetch and print logs from both output and error streams
+                output_token = await self._forward_logs(
+                    logs_client, log_group_output, self.run_id, output_token
+                )
+                error_token = await self._forward_logs(logs_client, 
log_group_error, self.run_id, error_token)
+
+                if job_run_state in ("FAILED", "TIMEOUT"):
+                    raise AirflowException(
+                        f"Glue Job {self.job_name} Run {self.run_id} exited 
with state: {job_run_state}"
+                    )
+                if job_run_state in ("SUCCEEDED", "STOPPED"):
+                    self.log.info(
+                        "Exiting Job %s Run %s State: %s",
+                        self.job_name,
+                        self.run_id,
+                        job_run_state,
+                    )
+                    yield TriggerEvent({"status": "success", self.return_key: 
self.return_value})
+                    return
+
+                self.log.info(
+                    "Polling for AWS Glue Job %s current run state: %s",
+                    self.job_name,
+                    job_run_state,
+                )
+                await asyncio.sleep(self.waiter_delay)
+
+                # Fetch updated state for next iteration
+                resp = await glue_client.get_job_run(JobName=self.job_name, 
RunId=self.run_id)
+                job_run_state = resp["JobRun"]["JobRunState"]
+
+            raise AirflowException(
+                f"Glue Job {self.job_name} Run {self.run_id} waiter exceeded 
max attempts ({self.attempts})"
+            )
+
+    async def _forward_logs(
+        self,
+        logs_client: Any,
+        log_group: str,
+        log_stream: str,
+        next_token: str | None,
+    ) -> str | None:
+        # Matches the format used by the synchronous 
GlueJobHook.print_job_logs.
+        fetched_logs: list[str] = []
+        while True:
+            token_arg: dict[str, str] = {"nextToken": next_token} if 
next_token else {}
+            try:
+                response = await logs_client.get_log_events(
+                    logGroupName=log_group,
+                    logStreamName=log_stream,
+                    startFromHead=True,
+                    **token_arg,
+                )
+            except ClientError as e:
+                if e.response["Error"]["Code"] == "ResourceNotFoundException":
+                    self.log.warning(
+                        "No new Glue driver logs so far.\n"
+                        "If this persists, check the CloudWatch dashboard at: 
%r.",
+                        
f"https://{self.region_name}.console.aws.amazon.com/cloudwatch/home";,
+                    )
+                    return None
+                raise
+
+            events = response["events"]
+            fetched_logs.extend(event["message"] for event in events)
+
+            if not events or next_token == response["nextForwardToken"]:
+                break
+            next_token = response["nextForwardToken"]
+
+        if fetched_logs:
+            messages = "\t".join(line.rstrip() + "\n" for line in fetched_logs)
+            self.log.info("Glue Job Run %s Logs:\n\t%s", log_group, messages)
+        else:
+            self.log.info("No new log from the Glue Job in %s", log_group)
+
+        return response.get("nextForwardToken")

Review Comment:
   This logic can likely be shared between both implementations



##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py:
##########
@@ -87,6 +96,108 @@ def hook(self) -> AwsGenericHook:
             config=self.botocore_config,
         )
 
+    async def run(self) -> AsyncIterator[TriggerEvent]:
+        if not self.verbose:
+            async for event in super().run():
+                yield event
+            return
+
+        hook = self.hook()
+        async with (
+            await hook.get_async_conn() as glue_client,
+            await AwsLogsHook(
+                aws_conn_id=self.aws_conn_id, region_name=self.region_name
+            ).get_async_conn() as logs_client,
+        ):
+            # Get log group name from job run metadata and initial state in 
one call
+            job_run_resp = await 
glue_client.get_job_run(JobName=self.job_name, RunId=self.run_id)
+            log_group_prefix = job_run_resp["JobRun"].get("LogGroupName", 
"/aws-glue/jobs")
+            log_group_output = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
+            log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
+
+            output_token: str | None = None
+            error_token: str | None = None
+            job_run_state = job_run_resp["JobRun"]["JobRunState"]
+
+            for _attempt in range(self.attempts):
+                # Fetch and print logs from both output and error streams
+                output_token = await self._forward_logs(
+                    logs_client, log_group_output, self.run_id, output_token
+                )
+                error_token = await self._forward_logs(logs_client, 
log_group_error, self.run_id, error_token)
+
+                if job_run_state in ("FAILED", "TIMEOUT"):
+                    raise AirflowException(
+                        f"Glue Job {self.job_name} Run {self.run_id} exited 
with state: {job_run_state}"
+                    )
+                if job_run_state in ("SUCCEEDED", "STOPPED"):
+                    self.log.info(
+                        "Exiting Job %s Run %s State: %s",
+                        self.job_name,
+                        self.run_id,
+                        job_run_state,
+                    )
+                    yield TriggerEvent({"status": "success", self.return_key: 
self.return_value})
+                    return
+
+                self.log.info(
+                    "Polling for AWS Glue Job %s current run state: %s",
+                    self.job_name,
+                    job_run_state,
+                )
+                await asyncio.sleep(self.waiter_delay)
+
+                # Fetch updated state for next iteration
+                resp = await glue_client.get_job_run(JobName=self.job_name, 
RunId=self.run_id)
+                job_run_state = resp["JobRun"]["JobRunState"]
+
+            raise AirflowException(
+                f"Glue Job {self.job_name} Run {self.run_id} waiter exceeded 
max attempts ({self.attempts})"
+            )
+
+    async def _forward_logs(
+        self,
+        logs_client: Any,
+        log_group: str,
+        log_stream: str,
+        next_token: str | None,
+    ) -> str | None:
+        # Matches the format used by the synchronous 
GlueJobHook.print_job_logs.

Review Comment:
   Is it possible to reuse any code between the two? Otherwise they will get 
out of sync with each other regularly. In the past we've tried to extract all 
the synchronous code into helper methods and use that in both places and then 
here just add any calls that are async
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to