rawwar commented on issue #44694:
URL: https://github.com/apache/airflow/issues/44694#issuecomment-2520184962

   I modified the glue.py(in aws provider 9.1.0) hook's `print_job_logs` method 
and added a few print statements. Here's the updated glue.py and the task logs:
   
   ```
   def print_job_logs(
               self,
               job_name: str,
               run_id: str,
               continuation_tokens: LogContinuationTokens,
           ):
               """
               Print the latest job logs to the Airflow task log and updates 
the continuation tokens.
   
               :param continuation_tokens: the tokens where to resume from when 
reading logs.
                   The object gets updated with the new tokens by this method.
               """
               log_client = self.logs_hook.get_conn()
               paginator = log_client.get_paginator("filter_log_events")
               
   
               def display_logs_from(log_group: str, continuation_token: str | 
None) -> str | None:
                   """Mutualize iteration over the 2 different log streams glue 
jobs write to."""
                   print(f"display_logs_from start with log_group={log_group}, 
continuation_token={continuation_token}")
                   fetched_logs = []
                   next_token = continuation_token
                   try:
                       for response in paginator.paginate(
                           logGroupName=log_group,
                           logStreamNames=[run_id],
                           PaginationConfig={"StartingToken": 
continuation_token},
                       ):
                           print("paginator response", response)
                           fetched_logs.extend([event["message"] for event in 
response["events"]])
                           # if the response is empty there is no nextToken in 
it
                           next_token = response.get("nextToken") or next_token
                           print("fetched_logs", fetched_logs)
                           print("next_token", next_token)
                   except ClientError as e:
                       if e.response["Error"]["Code"] == 
"ResourceNotFoundException":
                           # we land here when the log groups/streams don't 
exist yet
                           self.log.warning(
                               "No new Glue driver logs so far.\n"
                               "If this persists, check the CloudWatch 
dashboard at: %r.",
                               
f"https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home";,
                           )
                       else:
                           print("error", e)
                           raise
                   print("finished paginator")
                   if len(fetched_logs):
                       # Add a tab to indent those logs and distinguish them 
from airflow logs.
                       # Log lines returned already contain a newline character 
at the end.
                       messages = "\t".join(fetched_logs)
                       self.log.info("Glue Job Run %s Logs:\n\t%s", log_group, 
messages)
                   else:
                       self.log.info("No new log from the Glue Job in %s", 
log_group)
                   return next_token
   
               log_group_prefix = self.conn.get_job_run(JobName=job_name, 
RunId=run_id)["JobRun"]["LogGroupName"]
               log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
               log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
               print(f"log_group_prefix={log_group_prefix}, 
log_group_default={log_group_default}, log_group_error={log_group_error}")
               # one would think that the error log group would contain only 
errors, but it actually contains
               # a lot of interesting logs too, so it's valuable to have both
               print("before display_logs_from")
               continuation_tokens.output_stream_continuation = 
display_logs_from(
                   log_group_default, 
continuation_tokens.output_stream_continuation
               )
               print("After")
               continuation_tokens.error_stream_continuation = 
display_logs_from(
                   log_group_error, 
continuation_tokens.error_stream_continuation
               )
               print("Done")
   ```
   
   
   Task logs are attached
   [task 
logs.log](https://github.com/user-attachments/files/18023154/task.logs.log)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to