This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3fc2f02fa8b Fix GlueJobHook failing to update a Glue job that has tags
(#68711)
3fc2f02fa8b is described below
commit 3fc2f02fa8baab99296f13f275bb2200504904db
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Jun 19 22:57:47 2026 +0900
Fix GlueJobHook failing to update a Glue job that has tags (#68711)
Signed-off-by: PoAn Yang <[email protected]>
---
.../src/airflow/providers/amazon/aws/hooks/glue.py | 32 +++++++++-
.../tests/unit/amazon/aws/hooks/test_glue.py | 69 ++++++++++++++++++++++
2 files changed, 100 insertions(+), 1 deletion(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
index 0e58d605689..6656c3fedb3 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
@@ -36,6 +36,7 @@ from tenacity import (
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
+from airflow.providers.amazon.aws.hooks.sts import StsHook
from airflow.providers.common.compat.sdk import AirflowException
DEFAULT_LOG_SUFFIX = "output"
@@ -485,6 +486,8 @@ class GlueJobHook(AwsBaseHook):
:return: True if job was updated and false otherwise
"""
job_name = job_kwargs.pop("Name")
+ # Glue ``update_job`` does not accept ``Tags`` in ``JobUpdate``;
reconcile them separately.
+ tags_updated = self.update_tags(job_name, job_kwargs.pop("Tags")) if
"Tags" in job_kwargs else False
current_job = self.conn.get_job(JobName=job_name)["Job"]
update_config = {
@@ -495,7 +498,34 @@ class GlueJobHook(AwsBaseHook):
self.conn.update_job(JobName=job_name, JobUpdate=job_kwargs)
self.log.info("Updated configurations: %s", update_config)
return True
- return False
+ return tags_updated
+
+ def update_tags(self, job_name: str, job_tags: dict) -> bool:
+ """
+ Reconcile a job's tags with the desired set.
+
+ Glue manages tags outside of ``update_job``, so adds/updates go through
+ ``tag_resource`` and removals through ``untag_resource``.
+
+ .. seealso::
+ - :external+boto3:py:meth:`Glue.Client.tag_resource`
+ - :external+boto3:py:meth:`Glue.Client.untag_resource`
+
+ :param job_name: Name of the job for which to update tags
+ :param job_tags: Desired tags. Keys absent from this mapping are
removed from the job.
+ :return: True if any tag was added, changed, or removed, False
otherwise
+ """
+ account_number =
StsHook(aws_conn_id=self.aws_conn_id).get_account_number()
+ job_arn =
f"arn:{self.conn_partition}:glue:{self.conn_region_name}:{account_number}:job/{job_name}"
+ current_tags: dict = self.conn.get_tags(ResourceArn=job_arn)["Tags"]
+
+ if tags_to_add := {key: value for key, value in job_tags.items() if
current_tags.get(key) != value}:
+ self.log.info("Updating job tags: %s", job_name)
+ self.conn.tag_resource(ResourceArn=job_arn, TagsToAdd=tags_to_add)
+ if tags_to_remove := [key for key in current_tags if key not in
job_tags]:
+ self.log.info("Removing job tags: %s", job_name)
+ self.conn.untag_resource(ResourceArn=job_arn,
TagsToRemove=tags_to_remove)
+ return bool(tags_to_add or tags_to_remove)
def get_or_create_glue_job(self) -> str | None:
"""
diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_glue.py
b/providers/amazon/tests/unit/amazon/aws/hooks/test_glue.py
index 3e78b296042..1b3dda0c881 100644
--- a/providers/amazon/tests/unit/amazon/aws/hooks/test_glue.py
+++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_glue.py
@@ -26,6 +26,7 @@ import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_aws
+from moto.core import DEFAULT_ACCOUNT_ID
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook,
GlueJobHook
@@ -310,6 +311,74 @@ class TestGlueJobHook:
)
assert result == job_name
+ @mock_aws
+ @pytest.mark.parametrize(
+ ("current_tags", "desired_tags", "expected_tags", "expected_changed"),
+ [
+ pytest.param(
+ {"env": "dev"},
+ {"env": "dev", "team": "data"},
+ {"env": "dev", "team": "data"},
+ True,
+ id="add-new-tag",
+ ),
+ pytest.param({"env": "dev"}, {"env": "prod"}, {"env": "prod"},
True, id="replace-value"),
+ pytest.param(
+ {"env": "dev", "team": "data"}, {"env": "dev"}, {"env":
"dev"}, True, id="remove-tag"
+ ),
+ pytest.param({"env": "dev"}, {}, {}, True, id="remove-all-tags"),
+ pytest.param({"env": "dev"}, {"env": "dev"}, {"env": "dev"},
False, id="no-change"),
+ ],
+ )
+ def test_update_tags(self, current_tags, desired_tags, expected_tags,
expected_changed):
+ """Tags are reconciled through the tag API because Glue ``update_job``
cannot modify them."""
+ job_name = "aws_test_glue_job_with_tags"
+ boto3.client("glue", region_name=self.some_aws_region).create_job(
+ Name=job_name,
+ Role="test-role",
+ Command={"Name": "glueetl", "ScriptLocation":
"s3://glue-examples/script.py"},
+ Tags=current_tags,
+ )
+ hook = GlueJobHook(job_name=job_name, region_name=self.some_aws_region)
+
+ assert hook.update_tags(job_name, desired_tags) is expected_changed
+
+ job_arn =
f"arn:aws:glue:{self.some_aws_region}:{DEFAULT_ACCOUNT_ID}:job/{job_name}"
+ assert hook.conn.get_tags(ResourceArn=job_arn)["Tags"] == expected_tags
+
+ @mock.patch.object(GlueJobHook, "update_tags")
+ @mock.patch.object(AwsBaseHook, "conn")
+ def test_update_job_keeps_tags_out_of_job_update(self, mock_conn,
mock_update_tags):
+ """``Tags`` must be stripped from ``JobUpdate`` and reconciled
separately."""
+ job_name = "aws_test_glue_job"
+ mock_conn.get_job.return_value = {"Job": {"Name": job_name,
"Description": "old"}}
+ mock_update_tags.return_value = False
+ hook = GlueJobHook(job_name=job_name, region_name=self.some_aws_region)
+
+ updated = hook.update_job(Name=job_name, Description="new",
Tags={"env": "prod"})
+
+ mock_update_tags.assert_called_once_with(job_name, {"env": "prod"})
+ mock_conn.update_job.assert_called_once_with(JobName=job_name,
JobUpdate={"Description": "new"})
+ assert updated is True
+
+ @pytest.mark.parametrize("tags_changed", [True, False])
+ @mock.patch.object(GlueJobHook, "update_tags")
+ @mock.patch.object(AwsBaseHook, "conn")
+ def test_update_job_returns_tag_result_when_config_unchanged(
+ self, mock_conn, mock_update_tags, tags_changed
+ ):
+ """With no config diff, the result reflects whether only the tags
changed."""
+ job_name = "aws_test_glue_job"
+ mock_conn.get_job.return_value = {"Job": {"Name": job_name,
"Description": "same"}}
+ mock_update_tags.return_value = tags_changed
+ hook = GlueJobHook(job_name=job_name, region_name=self.some_aws_region)
+
+ updated = hook.update_job(Name=job_name, Description="same",
Tags={"env": "prod"})
+
+ mock_update_tags.assert_called_once_with(job_name, {"env": "prod"})
+ mock_conn.update_job.assert_not_called()
+ assert updated is tags_changed
+
@mock_aws
@mock.patch.object(GlueJobHook, "get_iam_execution_role")
def test_create_or_update_glue_job_worker_type(self,
mock_get_iam_execution_role):