ferruzzi commented on code in PR #30886:
URL: https://github.com/apache/airflow/pull/30886#discussion_r1178374333
##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -194,46 +198,61 @@ def print_job_logs(
self,
job_name: str,
run_id: str,
- job_failed: bool = False,
- next_token: str | None = None,
- ) -> str | None:
- """Prints the batch of logs to the Airflow task log and returns
nextToken."""
- log_client = boto3.client("logs")
- response = {}
+ continuation_tokens: LogContinuationTokens,
+ ):
+ """
+ Prints the batch of logs to the Airflow task log and returns nextToken.
Review Comment:
Doesn't appear to return anything. Copypasta??
##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -194,46 +198,61 @@ def print_job_logs(
self,
job_name: str,
run_id: str,
- job_failed: bool = False,
- next_token: str | None = None,
- ) -> str | None:
- """Prints the batch of logs to the Airflow task log and returns
nextToken."""
- log_client = boto3.client("logs")
- response = {}
+ continuation_tokens: LogContinuationTokens,
+ ):
+ """
+ Prints the batch of logs to the Airflow task log and returns nextToken.
- filter_pattern = FAILURE_LOG_FILTER if job_failed else
DEFAULT_LOG_FILTER
- log_group_prefix = self.conn.get_job_run(JobName=job_name,
RunId=run_id)["JobRun"]["LogGroupName"]
- log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else
DEFAULT_LOG_SUFFIX
- log_group_name = f"{log_group_prefix}/{log_group_suffix}"
+ :param continuation_tokens: the tokens where to resume from when
reading logs.
+ The object gets updated with the new tokens by this method.
+ """
+ log_client = boto3.client("logs")
+ paginator = log_client.get_paginator("filter_log_events")
- try:
- if next_token:
- response = log_client.filter_log_events(
- logGroupName=log_group_name,
+ def display_logs_from(log_group: str, continuation_token: str | None)
-> str | None:
+ """Internal method to mutualize iteration over the 2 different log
streams glue jobs write to"""
+ fetched_logs = []
+ next_token = continuation_token
+ try:
+ for response in paginator.paginate(
+ logGroupName=log_group,
logStreamNames=[run_id],
- filterPattern=filter_pattern,
- nextToken=next_token,
- )
+ PaginationConfig={"StartingToken": continuation_token},
+ ):
+ fetched_logs.extend([event["message"] for event in
response["events"]])
+ # if the response is empty there is no nextToken in it
+ next_token = response.get("nextToken") or next_token
+ except ClientError as e:
+ if e.response["Error"]["Code"] == "ResourceNotFoundException":
+ # we land here when the log groups/streams don't exist yet
+ self.log.warning(
+ "No new Glue driver logs so far.\nIf this persists,
check the CloudWatch dashboard "
+ f"at:
https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home"
+ )
+ else:
+ raise
+
+ if len(fetched_logs):
+ # Add a tab to indent those logs and distinguish them from
airflow logs.
+ # Log lines returned already contain a newline character at
the end.
+ messages = "\t".join(fetched_logs)
+ self.log.info("Glue Job Run %s Logs:\n\t%s", log_group,
messages)
else:
- response = log_client.filter_log_events(
- logGroupName=log_group_name,
- logStreamNames=[run_id],
- filterPattern=filter_pattern,
- )
- if len(response["events"]):
- messages = "\t".join([event["message"] for event in
response["events"]])
- self.log.info("Glue Job Run Logs:\n\t%s", messages)
-
- except log_client.exceptions.ResourceNotFoundException:
- self.log.warning(
- "No new Glue driver logs found. This might be because there
are no new logs, "
- "or might be an error.\nIf the error persists, check the
CloudWatch dashboard "
- f"at:
https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home"
- )
+ self.log.info("No new log from the Glue Job in %s", log_group)
+ return next_token
- # If no new log events are available, filter_log_events will return
None.
- # In that case, check the same token again next pass.
- return response.get("nextToken") or next_token
+ log_group_prefix = self.conn.get_job_run(JobName=job_name,
RunId=run_id)["JobRun"]["LogGroupName"]
+ log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
+ log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
+
+ # one would think that the error log group would contain only errors,
but it actually contains
+ # a lot of interesting logs too, so it's valuable to have both
+ continuation_tokens.output_stream_continuation = display_logs_from(
+ log_group_default, continuation_tokens.output_stream_continuation
+ )
+ continuation_tokens.error_stream_continuation = display_logs_from(
+ log_group_error, continuation_tokens.error_stream_continuation
+ )
Review Comment:
(Non-blocking) This feels awkward to me, maybe update the token in the
display_logs method? Or maybe pass in `log_group_prefix` and
`continuation_tokens`and handle the string concatenation and token value update
in there too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]