This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e55d90f0692 fix(glue): Fix GlueJobOperator verbose logs not showing in
deferrable mode (#64342)
e55d90f0692 is described below
commit e55d90f0692ffca5b3b6f98b02e37ce0e3ca350b
Author: Shivam Rastogi <[email protected]>
AuthorDate: Fri Apr 10 10:10:55 2026 -0700
fix(glue): Fix GlueJobOperator verbose logs not showing in deferrable mode
(#64342)
When using GlueJobOperator with deferrable=True and verbose=True,
CloudWatch logs were silently ignored because the trigger inherited
the base waiter's run() method which only polls job status.
This adds a run() override and _forward_logs() helper to
GlueJobCompleteTrigger that streams logs from both output and error
CloudWatch log groups, matching the format used by the synchronous path.
Key changes:
- Extract get_glue_log_group_names() and format_glue_logs() as shared
helpers in hooks/glue.py to eliminate sync/async duplication
- Override run() in GlueJobCompleteTrigger for verbose log streaming
- Yield TriggerEvent with error status on failure (matching base class
pattern) instead of raising AirflowException in the triggerer process
- Add tests for verbose success, failure, max attempts, pagination,
ResourceNotFoundException, and no-new-events scenarios
closes: #56535
---
.../src/airflow/providers/amazon/aws/hooks/glue.py | 36 ++-
.../airflow/providers/amazon/aws/triggers/glue.py | 135 +++++++++++-
.../tests/unit/amazon/aws/triggers/test_glue.py | 245 +++++++++++++++++++++
3 files changed, 404 insertions(+), 12 deletions(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
index cc7cfd2849c..0e58d605689 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
@@ -42,6 +42,28 @@ DEFAULT_LOG_SUFFIX = "output"
ERROR_LOG_SUFFIX = "error"
+def get_glue_log_group_names(job_run: dict[str, Any]) -> tuple[str, str]:
+ """Extract the output and error CloudWatch log group names from a Glue job
run response."""
+ log_group_prefix = job_run["LogGroupName"]
+ return (
+ f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}",
+ f"{log_group_prefix}/{ERROR_LOG_SUFFIX}",
+ )
+
+
+def format_glue_logs(fetched_logs: list[str], log_group: str) -> str:
+ """
+ Format fetched CloudWatch log messages for display.
+
+ Shared between ``GlueJobHook.print_job_logs`` and
``GlueJobCompleteTrigger._forward_logs``
+ so that both the sync and async paths produce identical output.
+ """
+ if fetched_logs:
+ messages = "\t".join(line.rstrip() + "\n" for line in fetched_logs)
+ return f"Glue Job Run {log_group} Logs:\n\t{messages}"
+ return f"No new log from the Glue Job in {log_group}"
+
+
class GlueJobHook(AwsBaseHook):
"""
Interact with AWS Glue.
@@ -350,22 +372,14 @@ class GlueJobHook(AwsBaseHook):
else:
raise
- if len(fetched_logs):
- # Add a tab to indent those logs and distinguish them from
airflow logs.
- # Log lines returned already contain a newline character at
the end.
- messages = "\t".join(fetched_logs)
- self.log.info("Glue Job Run %s Logs:\n\t%s", log_group,
messages)
- else:
- self.log.info("No new log from the Glue Job in %s", log_group)
+ self.log.info(format_glue_logs(fetched_logs, log_group))
return next_token
- log_group_prefix = job_run["LogGroupName"]
- log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
- log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
+ log_group_output, log_group_error = get_glue_log_group_names(job_run)
# one would think that the error log group would contain only errors,
but it actually contains
# a lot of interesting logs too, so it's valuable to have both
continuation_tokens.output_stream_continuation = display_logs_from(
- log_group_default, continuation_tokens.output_stream_continuation
+ log_group_output, continuation_tokens.output_stream_continuation
)
continuation_tokens.error_stream_continuation = display_logs_from(
log_group_error, continuation_tokens.error_stream_continuation
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
index f54f761825e..e499763aafa 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
@@ -22,11 +22,19 @@ from collections.abc import AsyncIterator
from functools import cached_property
from typing import TYPE_CHECKING, Any
+from botocore.exceptions import ClientError
+
if TYPE_CHECKING:
from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
-from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook,
GlueJobHook
+from airflow.providers.amazon.aws.hooks.glue import (
+ GlueDataQualityHook,
+ GlueJobHook,
+ format_glue_logs,
+ get_glue_log_group_names,
+)
from airflow.providers.amazon.aws.hooks.glue_catalog import GlueCatalogHook
+from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
from airflow.triggers.base import BaseTrigger, TriggerEvent
@@ -87,6 +95,131 @@ class GlueJobCompleteTrigger(AwsBaseWaiterTrigger):
config=self.botocore_config,
)
+ async def run(self) -> AsyncIterator[TriggerEvent]:
+ if not self.verbose:
+ async for event in super().run():
+ yield event
+ return
+
+ hook = self.hook()
+ async with (
+ await hook.get_async_conn() as glue_client,
+ await AwsLogsHook(
+ aws_conn_id=self.aws_conn_id, region_name=self.region_name
+ ).get_async_conn() as logs_client,
+ ):
+ # Get log group names from job run metadata
+ job_run_resp = await
glue_client.get_job_run(JobName=self.job_name, RunId=self.run_id)
+ log_group_output, log_group_error =
get_glue_log_group_names(job_run_resp["JobRun"])
+
+ output_token: str | None = None
+ error_token: str | None = None
+
+ for _attempt in range(self.attempts):
+ # Fetch current job state
+ resp = await glue_client.get_job_run(JobName=self.job_name,
RunId=self.run_id)
+ job_run_state = resp["JobRun"]["JobRunState"]
+
+ # Fetch and print logs from both output and error streams
+ try:
+ output_token = await self._forward_logs(
+ logs_client, log_group_output, self.run_id,
output_token
+ )
+ error_token = await self._forward_logs(
+ logs_client, log_group_error, self.run_id, error_token
+ )
+ except ClientError as e:
+ self.log.error(
+ "Failed to fetch logs for Glue Job %s Run %s: %s",
+ self.job_name,
+ self.run_id,
+ e,
+ )
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Failed to fetch logs for Glue Job
{self.job_name} Run {self.run_id}: {e}",
+ self.return_key: self.return_value,
+ }
+ )
+ return
+
+ if job_run_state in ("FAILED", "TIMEOUT"):
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Glue Job {self.job_name} Run
{self.run_id}"
+ f" exited with state: {job_run_state}",
+ self.return_key: self.return_value,
+ }
+ )
+ return
+ if job_run_state in ("SUCCEEDED", "STOPPED"):
+ self.log.info(
+ "Exiting Job %s Run %s State: %s",
+ self.job_name,
+ self.run_id,
+ job_run_state,
+ )
+ yield TriggerEvent({"status": "success", self.return_key:
self.return_value})
+ return
+
+ self.log.info(
+ "Polling for AWS Glue Job %s current run state: %s",
+ self.job_name,
+ job_run_state,
+ )
+ await asyncio.sleep(self.waiter_delay)
+
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Glue Job {self.job_name} Run {self.run_id}"
+ f" waiter exceeded max attempts ({self.attempts})",
+ self.return_key: self.return_value,
+ }
+ )
+
+ async def _forward_logs(
+ self,
+ logs_client: Any,
+ log_group: str,
+ log_stream: str,
+ next_token: str | None,
+ ) -> str | None:
+ # Matches the format used by the synchronous
GlueJobHook.print_job_logs.
+ fetched_logs: list[str] = []
+ while True:
+ token_arg: dict[str, str] = {"nextToken": next_token} if
next_token else {}
+ try:
+ response = await logs_client.get_log_events(
+ logGroupName=log_group,
+ logStreamName=log_stream,
+ startFromHead=True,
+ **token_arg,
+ )
+ except ClientError as e:
+ if e.response["Error"]["Code"] == "ResourceNotFoundException":
+ region = logs_client.meta.region_name
+ self.log.warning(
+ "No new Glue driver logs so far.\n"
+ "If this persists, check the CloudWatch dashboard at:
%r.",
+
f"https://{region}.console.aws.amazon.com/cloudwatch/home",
+ )
+ return None
+ raise
+
+ events = response["events"]
+ fetched_logs.extend(event["message"] for event in events)
+
+ if not events or next_token == response["nextForwardToken"]:
+ break
+ next_token = response["nextForwardToken"]
+
+ self.log.info(format_glue_logs(fetched_logs, log_group))
+
+ return response.get("nextForwardToken")
+
class GlueCatalogPartitionTrigger(BaseTrigger):
"""
diff --git a/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
b/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
index 4339d36ce38..61a1f8385e6 100644
--- a/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
+++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
@@ -21,9 +21,11 @@ from unittest import mock
from unittest.mock import AsyncMock
import pytest
+from botocore.exceptions import ClientError
from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook,
GlueJobHook
from airflow.providers.amazon.aws.hooks.glue_catalog import GlueCatalogHook
+from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
from airflow.providers.amazon.aws.triggers.glue import (
GlueCatalogPartitionTrigger,
GlueDataQualityRuleRecommendationRunCompleteTrigger,
@@ -111,6 +113,249 @@ class TestGlueJobTrigger:
"waiter_delay": 10,
}
+ def test_serialization_verbose(self):
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="JobRunId",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_max_attempts=3,
+ waiter_delay=10,
+ )
+ classpath, kwargs = trigger.serialize()
+ assert kwargs["verbose"] is True
+
+ @pytest.mark.asyncio
+ @mock.patch.object(AwsLogsHook, "get_async_conn")
+ @mock.patch.object(GlueJobHook, "get_async_conn")
+ async def test_verbose_run_success(self, mock_glue_conn, mock_logs_conn):
+ """When verbose=True, the trigger polls job state and fetches
CloudWatch logs."""
+ glue_client = AsyncMock()
+ glue_client.get_job_run = AsyncMock(
+ side_effect=[
+ # First call: log group metadata
+ {"JobRun": {"JobRunState": "RUNNING", "LogGroupName":
"/aws-glue/python-jobs"}},
+ # Second call: state check at top of iteration 1 (RUNNING)
+ {"JobRun": {"JobRunState": "RUNNING", "LogGroupName":
"/aws-glue/python-jobs"}},
+ # Third call: state check at top of iteration 2 (SUCCEEDED)
+ {"JobRun": {"JobRunState": "SUCCEEDED", "LogGroupName":
"/aws-glue/python-jobs"}},
+ ]
+ )
+ mock_glue_conn.return_value.__aenter__ =
AsyncMock(return_value=glue_client)
+ mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(
+ return_value={
+ "events": [{"timestamp": 1234, "message": "Processing step
1\n"}],
+ "nextForwardToken": "token_1",
+ }
+ )
+ mock_logs_conn.return_value.__aenter__ =
AsyncMock(return_value=logs_client)
+ mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ generator = trigger.run()
+ event = await generator.asend(None)
+
+ assert event.payload["status"] == "success"
+ assert event.payload["run_id"] == "jr_123"
+ # Logs client was called for both output and error streams
+ assert logs_client.get_log_events.call_count >= 2
+
+ @pytest.mark.asyncio
+ @mock.patch.object(AwsLogsHook, "get_async_conn")
+ @mock.patch.object(GlueJobHook, "get_async_conn")
+ async def test_verbose_run_job_failed(self, mock_glue_conn,
mock_logs_conn):
+ """When verbose=True and the job fails, the trigger yields an error
event."""
+ glue_client = AsyncMock()
+ glue_client.get_job_run = AsyncMock(
+ return_value={"JobRun": {"JobRunState": "FAILED", "LogGroupName":
"/aws-glue/python-jobs"}}
+ )
+ mock_glue_conn.return_value.__aenter__ =
AsyncMock(return_value=glue_client)
+ mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(return_value={"events": [],
"nextForwardToken": "token_1"})
+ mock_logs_conn.return_value.__aenter__ =
AsyncMock(return_value=logs_client)
+ mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ generator = trigger.run()
+ event = await generator.asend(None)
+ assert event.payload["status"] == "error"
+ assert "FAILED" in event.payload["message"]
+ assert event.payload["run_id"] == "jr_123"
+
+ @pytest.mark.asyncio
+ @mock.patch.object(AwsLogsHook, "get_async_conn")
+ @mock.patch.object(GlueJobHook, "get_async_conn")
+ async def test_verbose_run_max_attempts(self, mock_glue_conn,
mock_logs_conn):
+ """When verbose=True and the job stays RUNNING past max attempts,
yields an error event."""
+ glue_client = AsyncMock()
+ glue_client.get_job_run = AsyncMock(
+ return_value={"JobRun": {"JobRunState": "RUNNING", "LogGroupName":
"/aws-glue/python-jobs"}}
+ )
+ mock_glue_conn.return_value.__aenter__ =
AsyncMock(return_value=glue_client)
+ mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(return_value={"events": [],
"nextForwardToken": "token_1"})
+ mock_logs_conn.return_value.__aenter__ =
AsyncMock(return_value=logs_client)
+ mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=2,
+ )
+ generator = trigger.run()
+ event = await generator.asend(None)
+ assert event.payload["status"] == "error"
+ assert "max attempts" in event.payload["message"]
+ assert event.payload["run_id"] == "jr_123"
+
+ @pytest.mark.asyncio
+ @mock.patch.object(AwsLogsHook, "get_async_conn")
+ @mock.patch.object(GlueJobHook, "get_async_conn")
+ async def test_verbose_run_cloudwatch_client_error(self, mock_glue_conn,
mock_logs_conn):
+ """When verbose=True and CloudWatch returns an unexpected ClientError,
yields error event."""
+ glue_client = AsyncMock()
+ glue_client.get_job_run = AsyncMock(
+ return_value={"JobRun": {"JobRunState": "RUNNING", "LogGroupName":
"/aws-glue/python-jobs"}}
+ )
+ mock_glue_conn.return_value.__aenter__ =
AsyncMock(return_value=glue_client)
+ mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(
+ side_effect=ClientError(
+ {"Error": {"Code": "AccessDeniedException", "Message": "not
authorized"}},
+ "GetLogEvents",
+ )
+ )
+ mock_logs_conn.return_value.__aenter__ =
AsyncMock(return_value=logs_client)
+ mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ generator = trigger.run()
+ event = await generator.asend(None)
+ assert event.payload["status"] == "error"
+ assert "Failed to fetch logs" in event.payload["message"]
+ assert "AccessDeniedException" in event.payload["message"]
+ assert event.payload["run_id"] == "jr_123"
+
+ @pytest.mark.asyncio
+ async def test_forward_logs_resource_not_found(self):
+ """_forward_logs handles ResourceNotFoundException gracefully and uses
resolved region in URL."""
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(
+ side_effect=ClientError(
+ {"Error": {"Code": "ResourceNotFoundException", "Message":
"not found"}},
+ "GetLogEvents",
+ )
+ )
+ logs_client.meta.region_name = "eu-west-1"
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ region_name=None,
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ with mock.patch.object(trigger.log, "warning") as mock_log_warning:
+ result = await trigger._forward_logs(logs_client,
"/aws-glue/python-jobs/output", "jr_123", None)
+ assert result is None
+ # Verify the URL uses the resolved region from the client, not
self.region_name (which is None)
+ url_arg = mock_log_warning.call_args[0][1]
+ assert "eu-west-1" in url_arg
+
+ @pytest.mark.asyncio
+ async def test_forward_logs_pagination(self):
+ """_forward_logs follows nextForwardToken and formats logs like the
sync path."""
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(
+ side_effect=[
+ {
+ "events": [{"timestamp": 1, "message": "line 1\n"}],
+ "nextForwardToken": "token_2",
+ },
+ {
+ "events": [{"timestamp": 2, "message": "line 2\n"}],
+ "nextForwardToken": "token_3",
+ },
+ {
+ "events": [],
+ "nextForwardToken": "token_3",
+ },
+ ]
+ )
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ with mock.patch.object(trigger.log, "info") as mock_log_info:
+ result = await trigger._forward_logs(logs_client,
"/aws-glue/python-jobs/output", "jr_123", None)
+ assert result == "token_3"
+ assert logs_client.get_log_events.call_count == 3
+ # Verify log format matches sync path: "Glue Job Run <log_group>
Logs:" with tab-indented lines
+ log_output = mock_log_info.call_args[0][0]
+ assert "Glue Job Run /aws-glue/python-jobs/output Logs:" in log_output
+ assert "\tline 1" in log_output
+ assert "\tline 2" in log_output
+
+ @pytest.mark.asyncio
+ async def test_forward_logs_no_new_events(self):
+ """_forward_logs logs 'No new log' when there are no events, matching
sync path."""
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(return_value={"events": [],
"nextForwardToken": "token_1"})
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ with mock.patch.object(trigger.log, "info") as mock_log_info:
+ result = await trigger._forward_logs(logs_client,
"/aws-glue/python-jobs/output", "jr_123", None)
+ assert result == "token_1"
+ log_output = mock_log_info.call_args[0][0]
+ assert "No new log from the Glue Job in /aws-glue/python-jobs/output"
in log_output
+
class TestGlueCatalogPartitionSensorTrigger:
@pytest.mark.asyncio