rawwar commented on issue #44694:
URL: https://github.com/apache/airflow/issues/44694#issuecomment-2520184962
I modified the glue.py(in aws provider 9.1.0) hook's `print_job_logs` method
and added a few print statements. Here's the updated glue.py and the task logs:
```
def print_job_logs(
self,
job_name: str,
run_id: str,
continuation_tokens: LogContinuationTokens,
):
"""
Print the latest job logs to the Airflow task log and updates
the continuation tokens.
:param continuation_tokens: the tokens where to resume from when
reading logs.
The object gets updated with the new tokens by this method.
"""
log_client = self.logs_hook.get_conn()
paginator = log_client.get_paginator("filter_log_events")
def display_logs_from(log_group: str, continuation_token: str |
None) -> str | None:
"""Mutualize iteration over the 2 different log streams glue
jobs write to."""
print(f"display_logs_from start with log_group={log_group},
continuation_token={continuation_token}")
fetched_logs = []
next_token = continuation_token
try:
for response in paginator.paginate(
logGroupName=log_group,
logStreamNames=[run_id],
PaginationConfig={"StartingToken":
continuation_token},
):
print("paginator response", response)
fetched_logs.extend([event["message"] for event in
response["events"]])
# if the response is empty there is no nextToken in
it
next_token = response.get("nextToken") or next_token
print("fetched_logs", fetched_logs)
print("next_token", next_token)
except ClientError as e:
if e.response["Error"]["Code"] ==
"ResourceNotFoundException":
# we land here when the log groups/streams don't
exist yet
self.log.warning(
"No new Glue driver logs so far.\n"
"If this persists, check the CloudWatch
dashboard at: %r.",
f"https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home",
)
else:
print("error", e)
raise
print("finished paginator")
if len(fetched_logs):
# Add a tab to indent those logs and distinguish them
from airflow logs.
# Log lines returned already contain a newline character
at the end.
messages = "\t".join(fetched_logs)
self.log.info("Glue Job Run %s Logs:\n\t%s", log_group,
messages)
else:
self.log.info("No new log from the Glue Job in %s",
log_group)
return next_token
log_group_prefix = self.conn.get_job_run(JobName=job_name,
RunId=run_id)["JobRun"]["LogGroupName"]
log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
print(f"log_group_prefix={log_group_prefix},
log_group_default={log_group_default}, log_group_error={log_group_error}")
# one would think that the error log group would contain only
errors, but it actually contains
# a lot of interesting logs too, so it's valuable to have both
print("before display_logs_from")
continuation_tokens.output_stream_continuation =
display_logs_from(
log_group_default,
continuation_tokens.output_stream_continuation
)
print("After")
continuation_tokens.error_stream_continuation =
display_logs_from(
log_group_error,
continuation_tokens.error_stream_continuation
)
print("Done")
```
Task logs are attached
[task
logs.log](https://github.com/user-attachments/files/18023154/task.logs.log)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]