vandonr-amz commented on code in PR #30886:
URL: https://github.com/apache/airflow/pull/30886#discussion_r1179781354


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -194,46 +198,61 @@ def print_job_logs(
         self,
         job_name: str,
         run_id: str,
-        job_failed: bool = False,
-        next_token: str | None = None,
-    ) -> str | None:
-        """Prints the batch of logs to the Airflow task log and returns 
nextToken."""
-        log_client = boto3.client("logs")
-        response = {}
+        continuation_tokens: LogContinuationTokens,
+    ):
+        """
+        Prints the batch of logs to the Airflow task log and returns nextToken.
 
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else 
DEFAULT_LOG_FILTER
-        log_group_prefix = self.conn.get_job_run(JobName=job_name, 
RunId=run_id)["JobRun"]["LogGroupName"]
-        log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else 
DEFAULT_LOG_SUFFIX
-        log_group_name = f"{log_group_prefix}/{log_group_suffix}"
+        :param continuation_tokens: the tokens where to resume from when 
reading logs.
+            The object gets updated with the new tokens by this method.
+        """
+        log_client = boto3.client("logs")
+        paginator = log_client.get_paginator("filter_log_events")
 
-        try:
-            if next_token:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
+        def display_logs_from(log_group: str, continuation_token: str | None) 
-> str | None:
+            """Internal method to mutualize iteration over the 2 different log 
streams glue jobs write to"""
+            fetched_logs = []
+            next_token = continuation_token
+            try:
+                for response in paginator.paginate(
+                    logGroupName=log_group,
                     logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                    nextToken=next_token,
-                )
+                    PaginationConfig={"StartingToken": continuation_token},
+                ):
+                    fetched_logs.extend([event["message"] for event in 
response["events"]])
+                    # if the response is empty there is no nextToken in it
+                    next_token = response.get("nextToken") or next_token
+            except ClientError as e:
+                if e.response["Error"]["Code"] == "ResourceNotFoundException":
+                    # we land here when the log groups/streams don't exist yet
+                    self.log.warning(
+                        "No new Glue driver logs so far.\nIf this persists, 
check the CloudWatch dashboard "
+                        f"at: 
https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home";
+                    )
+                else:
+                    raise
+
+            if len(fetched_logs):
+                # Add a tab to indent those logs and distinguish them from 
airflow logs.
+                # Log lines returned already contain a newline character at 
the end.
+                messages = "\t".join(fetched_logs)
+                self.log.info("Glue Job Run %s Logs:\n\t%s", log_group, 
messages)
             else:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                )
-            if len(response["events"]):
-                messages = "\t".join([event["message"] for event in 
response["events"]])
-                self.log.info("Glue Job Run Logs:\n\t%s", messages)
-
-        except log_client.exceptions.ResourceNotFoundException:
-            self.log.warning(
-                "No new Glue driver logs found. This might be because there 
are no new logs, "
-                "or might be an error.\nIf the error persists, check the 
CloudWatch dashboard "
-                f"at: 
https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home";
-            )
+                self.log.info("No new log from the Glue Job in %s", log_group)
+            return next_token
 
-        # If no new log events are available, filter_log_events will return 
None.
-        # In that case, check the same token again next pass.
-        return response.get("nextToken") or next_token
+        log_group_prefix = self.conn.get_job_run(JobName=job_name, 
RunId=run_id)["JobRun"]["LogGroupName"]
+        log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
+        log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
+
+        # one would think that the error log group would contain only errors, 
but it actually contains
+        # a lot of interesting logs too, so it's valuable to have both
+        continuation_tokens.output_stream_continuation = display_logs_from(
+            log_group_default, continuation_tokens.output_stream_continuation
+        )
+        continuation_tokens.error_stream_continuation = display_logs_from(
+            log_group_error, continuation_tokens.error_stream_continuation
+        )

Review Comment:
   sooo... does it mean you approve this PR ? 😁 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to