shivaam commented on code in PR #63086:
URL: https://github.com/apache/airflow/pull/63086#discussion_r2902486633
##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py:
##########
@@ -87,6 +96,110 @@ def hook(self) -> AwsGenericHook:
config=self.botocore_config,
)
+ async def run(self) -> AsyncIterator[TriggerEvent]:
+ if not self.verbose:
+ async for event in super().run():
+ yield event
+ return
+
+ hook = self.hook()
+ async with (
+ await hook.get_async_conn() as glue_client,
+ await AwsLogsHook(
+ aws_conn_id=self.aws_conn_id, region_name=self.region_name
+ ).get_async_conn() as logs_client,
+ ):
+ # Get log group name from job run metadata and initial state in
one call
+ job_run_resp = await
glue_client.get_job_run(JobName=self.job_name, RunId=self.run_id)
+ log_group_prefix = job_run_resp["JobRun"].get("LogGroupName",
"/aws-glue/jobs")
+ log_group_output = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
+ log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
+
+ output_token: str | None = None
+ error_token: str | None = None
+ job_run_state = job_run_resp["JobRun"]["JobRunState"]
+
+ for _attempt in range(self.attempts):
+ # Fetch and print logs from both output and error streams
+ output_token = await self._forward_logs(
+ logs_client, log_group_output, self.run_id, output_token
+ )
+ error_token = await self._forward_logs(logs_client,
log_group_error, self.run_id, error_token)
+
+ if job_run_state in ("FAILED", "TIMEOUT"):
+ raise AirflowException(
+ f"Glue Job {self.job_name} Run {self.run_id} exited
with state: {job_run_state}"
+ )
+ if job_run_state in ("SUCCEEDED", "STOPPED"):
+ self.log.info(
+ "Exiting Job %s Run %s State: %s",
+ self.job_name,
+ self.run_id,
+ job_run_state,
+ )
+ yield TriggerEvent({"status": "success", self.return_key:
self.return_value})
+ return
+
+ self.log.info(
+ "Polling for AWS Glue Job %s current run state: %s",
+ self.job_name,
+ job_run_state,
+ )
+ await asyncio.sleep(self.waiter_delay)
+
+ # Fetch updated state for next iteration
+ resp = await glue_client.get_job_run(JobName=self.job_name,
RunId=self.run_id)
+ job_run_state = resp["JobRun"]["JobRunState"]
+
+ raise AirflowException(
+ f"Glue Job {self.job_name} Run {self.run_id} waiter exceeded
max attempts ({self.attempts})"
+ )
+
+ async def _forward_logs(
+ self,
+ logs_client: Any,
+ log_group: str,
+ log_stream: str,
+ next_token: str | None,
+ ) -> str | None:
+ """Fetch new CloudWatch log events and print them.
+ Matches the format used by the synchronous GlueJobHook.print_job_logs.
+ """
+ fetched_logs: list[str] = []
+ while True:
+ token_arg: dict[str, str] = {"nextToken": next_token} if
next_token else {}
+ try:
+ response = await logs_client.get_log_events(
+ logGroupName=log_group,
+ logStreamName=log_stream,
+ startFromHead=True,
+ **token_arg,
+ )
+ except ClientError as e:
+ if e.response["Error"]["Code"] == "ResourceNotFoundException":
+ self.log.warning(
Review Comment:
Out of scope for this PR, but in a future PR I propose that we cleanup the
logs and stop printing no logs for error stream.
```
INFO - Streaming logs from:
output: /aws-glue/python-jobs/output
error: /aws-glue/python-jobs/error
INFO - Polling for AWS Glue Job test-job current run state: RUNNING
INFO - [output] Processing step 1/20...
INFO - Polling for AWS Glue Job test-job current run state: RUNNING
INFO - [output] Processing step 2/20...
INFO - [error] Some runtime warning
INFO - Polling for AWS Glue Job test-job current run state: RUNNING
INFO - Exiting Job test-job Run jr_xxx State: SUCCEEDED
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]