This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9221f797ba2 add startTime to paginator.paginate when fetching logs in
GlueJobHook (#46950)
9221f797ba2 is described below
commit 9221f797ba2a65c3c683b06dfdb3b7cf16e04127
Author: Kalyan R <[email protected]>
AuthorDate: Thu Feb 27 03:49:20 2025 +0530
add startTime to paginator.paginate when fetching logs in GlueJobHook
(#46950)
* add startTime to paginator using startTime from job details
Co-authored-by: D. Ferruzzi <[email protected]>
---
providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
index 80c05feca92..d025652d0a8 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
@@ -236,6 +236,9 @@ class GlueJobHook(AwsBaseHook):
"""
log_client = self.logs_hook.get_conn()
paginator = log_client.get_paginator("filter_log_events")
+ job_run = self.conn.get_job_run(JobName=job_name,
RunId=run_id)["JobRun"]
+ # StartTime needs to be an int and is Epoch time in milliseconds
+ start_time = int(job_run["StartedOn"].timestamp() * 1000)
def display_logs_from(log_group: str, continuation_token: str | None)
-> str | None:
"""Mutualize iteration over the 2 different log streams glue jobs
write to."""
@@ -245,6 +248,7 @@ class GlueJobHook(AwsBaseHook):
for response in paginator.paginate(
logGroupName=log_group,
logStreamNames=[run_id],
+ startTime=start_time,
PaginationConfig={"StartingToken": continuation_token},
):
fetched_logs.extend([event["message"] for event in
response["events"]])
@@ -270,7 +274,7 @@ class GlueJobHook(AwsBaseHook):
self.log.info("No new log from the Glue Job in %s", log_group)
return next_token
- log_group_prefix = self.conn.get_job_run(JobName=job_name,
RunId=run_id)["JobRun"]["LogGroupName"]
+ log_group_prefix = job_run["LogGroupName"]
log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
# one would think that the error log group would contain only errors,
but it actually contains