This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9221f797ba2 add startTime to paginator.paginate when fetching logs in 
GlueJobHook (#46950)
9221f797ba2 is described below

commit 9221f797ba2a65c3c683b06dfdb3b7cf16e04127
Author: Kalyan R <[email protected]>
AuthorDate: Thu Feb 27 03:49:20 2025 +0530

    add startTime to paginator.paginate when fetching logs in GlueJobHook 
(#46950)
    
    * add startTime to paginator using startTime from job details
    
    Co-authored-by: D. Ferruzzi <[email protected]>
---
 providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py 
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
index 80c05feca92..d025652d0a8 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
@@ -236,6 +236,9 @@ class GlueJobHook(AwsBaseHook):
         """
         log_client = self.logs_hook.get_conn()
         paginator = log_client.get_paginator("filter_log_events")
+        job_run = self.conn.get_job_run(JobName=job_name, 
RunId=run_id)["JobRun"]
+        # StartTime needs to be an int and is Epoch time in milliseconds
+        start_time = int(job_run["StartedOn"].timestamp() * 1000)
 
         def display_logs_from(log_group: str, continuation_token: str | None) 
-> str | None:
             """Mutualize iteration over the 2 different log streams glue jobs 
write to."""
@@ -245,6 +248,7 @@ class GlueJobHook(AwsBaseHook):
                 for response in paginator.paginate(
                     logGroupName=log_group,
                     logStreamNames=[run_id],
+                    startTime=start_time,
                     PaginationConfig={"StartingToken": continuation_token},
                 ):
                     fetched_logs.extend([event["message"] for event in 
response["events"]])
@@ -270,7 +274,7 @@ class GlueJobHook(AwsBaseHook):
                 self.log.info("No new log from the Glue Job in %s", log_group)
             return next_token
 
-        log_group_prefix = self.conn.get_job_run(JobName=job_name, 
RunId=run_id)["JobRun"]["LogGroupName"]
+        log_group_prefix = job_run["LogGroupName"]
         log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
         log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
         # one would think that the error log group would contain only errors, 
but it actually contains

Reply via email to