This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 76c93053290 Add `GlueCatalogBatchDeletePartitionOperator` (#66721)
76c93053290 is described below

commit 76c930532905c88b3af0c13c7493105f7337ac4e
Author: John Jackson <[email protected]>
AuthorDate: Tue Jun 2 08:21:23 2026 -0700

    Add `GlueCatalogBatchDeletePartitionOperator` (#66721)
---
 providers/amazon/docs/operators/glue_catalog.rst   | 14 +++++
 .../providers/amazon/aws/operators/glue_catalog.py | 68 ++++++++++++++++++++++
 .../system/amazon/aws/example_glue_catalog.py      | 12 ++++
 .../unit/amazon/aws/operators/test_glue_catalog.py | 56 ++++++++++++++++++
 4 files changed, 150 insertions(+)

diff --git a/providers/amazon/docs/operators/glue_catalog.rst 
b/providers/amazon/docs/operators/glue_catalog.rst
index e8777727940..8fe4a8224ff 100644
--- a/providers/amazon/docs/operators/glue_catalog.rst
+++ b/providers/amazon/docs/operators/glue_catalog.rst
@@ -96,3 +96,17 @@ To create a partition in an AWS Glue Data Catalog table, use
     :dedent: 4
     :start-after: [START howto_operator_glue_catalog_create_partition]
     :end-before: [END howto_operator_glue_catalog_create_partition]
+
+.. _howto/operator:GlueCatalogBatchDeletePartitionOperator:
+
+Batch Delete Partitions
+-----------------------
+
+To delete one or more partitions from an AWS Glue Data Catalog table, use
+:class:`~airflow.providers.amazon.aws.operators.glue_catalog.GlueCatalogBatchDeletePartitionOperator`.
+
+.. exampleinclude:: 
/../../amazon/tests/system/amazon/aws/example_glue_catalog.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_glue_catalog_batch_delete_partition]
+    :end-before: [END howto_operator_glue_catalog_batch_delete_partition]
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_catalog.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_catalog.py
index 76141a4d8a4..296f6214e99 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_catalog.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_catalog.py
@@ -24,6 +24,7 @@ from botocore.exceptions import ClientError
 
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
 from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
+from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
 from airflow.utils.helpers import prune_dict
 
 if TYPE_CHECKING:
@@ -336,3 +337,70 @@ class 
GlueCatalogCreatePartitionOperator(AwsBaseOperator[AwsBaseHook]):
             else:
                 raise
         self.log.info("Partition created.")
+
+
+class GlueCatalogBatchDeletePartitionOperator(AwsBaseOperator[AwsBaseHook]):
+    """
+    Delete one or more partitions from an AWS Glue Data Catalog table.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GlueCatalogBatchDeletePartitionOperator`
+
+    :param database_name: The name of the database. (templated)
+    :param table_name: The name of the table. (templated)
+    :param partitions_to_delete: List of partition value dicts to delete. 
(templated)
+    :param catalog_id: The ID of the Data Catalog. Defaults to the account ID. 
(templated)
+    """
+
+    aws_hook_class = AwsBaseHook
+    template_fields: tuple[str, ...] = aws_template_fields(
+        "database_name", "table_name", "catalog_id", "partitions_to_delete"
+    )
+
+    def __init__(
+        self,
+        *,
+        database_name: str,
+        table_name: str,
+        partitions_to_delete: list[dict[str, list[str]]],
+        catalog_id: str | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.database_name = database_name
+        self.table_name = table_name
+        self.partitions_to_delete = partitions_to_delete
+        self.catalog_id = catalog_id
+
+    @property
+    def _hook_parameters(self) -> dict[str, Any]:
+        return {**super()._hook_parameters, "client_type": "glue"}
+
+    def execute(self, context: Context) -> list[dict[str, Any]]:
+        self.log.info(
+            "Deleting %d partitions from %s.%s",
+            len(self.partitions_to_delete),
+            self.database_name,
+            self.table_name,
+        )
+        kwargs: dict[str, Any] = prune_dict(
+            {
+                "DatabaseName": self.database_name,
+                "TableName": self.table_name,
+                "PartitionsToDelete": self.partitions_to_delete,
+                "CatalogId": self.catalog_id,
+            }
+        )
+        response = self.hook.conn.batch_delete_partition(**kwargs)
+        errors = response.get("Errors", [])
+        if errors:
+            # EntityNotFoundException is expected for idempotent deletes
+            real_errors = [
+                e for e in errors if e.get("ErrorDetail", {}).get("ErrorCode") 
!= "EntityNotFoundException"
+            ]
+            if real_errors:
+                raise RuntimeError(f"Failed to delete {len(real_errors)} 
partition(s): {real_errors}")
+            self.log.info("Some partitions not found (already deleted), 
continuing.")
+        self.log.info("Batch delete partitions complete.")
+        return errors
diff --git a/providers/amazon/tests/system/amazon/aws/example_glue_catalog.py 
b/providers/amazon/tests/system/amazon/aws/example_glue_catalog.py
index 6eb736e1eed..7eed9c8df10 100644
--- a/providers/amazon/tests/system/amazon/aws/example_glue_catalog.py
+++ b/providers/amazon/tests/system/amazon/aws/example_glue_catalog.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 from datetime import datetime
 
 from airflow.providers.amazon.aws.operators.glue_catalog import (
+    GlueCatalogBatchDeletePartitionOperator,
     GlueCatalogCreateDatabaseOperator,
     GlueCatalogCreatePartitionOperator,
     GlueCatalogCreateTableOperator,
@@ -106,6 +107,16 @@ with DAG(
     )
     # [END howto_operator_glue_catalog_create_partition]
 
+    # [START howto_operator_glue_catalog_batch_delete_partition]
+    batch_delete_partition = GlueCatalogBatchDeletePartitionOperator(
+        task_id="batch_delete_partition",
+        database_name=db_name,
+        table_name=table_name,
+        partitions_to_delete=[{"Values": ["2024-01-01"]}],
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    # [END howto_operator_glue_catalog_batch_delete_partition]
+
     # [START howto_operator_glue_catalog_delete_table]
     delete_table = GlueCatalogDeleteTableOperator(
         task_id="delete_table",
@@ -120,6 +131,7 @@ with DAG(
         create_database,
         create_table,
         create_partition,
+        batch_delete_partition,
         delete_table,
         delete_database,
     )
diff --git 
a/providers/amazon/tests/unit/amazon/aws/operators/test_glue_catalog.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_glue_catalog.py
index 6d40e0f36eb..ef9e4bdf498 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_glue_catalog.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_glue_catalog.py
@@ -24,6 +24,7 @@ from botocore.exceptions import ClientError
 
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
 from airflow.providers.amazon.aws.operators.glue_catalog import (
+    GlueCatalogBatchDeletePartitionOperator,
     GlueCatalogCreateDatabaseOperator,
     GlueCatalogCreatePartitionOperator,
     GlueCatalogCreateTableOperator,
@@ -264,3 +265,58 @@ class TestGlueCatalogCreatePartitionOperator:
 
     def test_template_fields(self):
         validate_template_fields(self.operator)
+
+
+PARTITIONS_TO_DELETE = [{"Values": ["2024-01-01"]}, {"Values": ["2024-01-02"]}]
+
+
+class TestGlueCatalogBatchDeletePartitionOperator:
+    def setup_method(self):
+        self.operator = GlueCatalogBatchDeletePartitionOperator(
+            task_id="batch_delete_partition",
+            database_name=DB_NAME,
+            table_name=TABLE_NAME,
+            partitions_to_delete=PARTITIONS_TO_DELETE,
+        )
+
+    @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+    def test_execute(self, mock_conn):
+        mock_client = mock.MagicMock()
+        mock_client.batch_delete_partition.return_value = {"Errors": []}
+        mock_conn.return_value = mock_client
+
+        result = self.operator.execute({})
+
+        mock_client.batch_delete_partition.assert_called_once_with(
+            DatabaseName=DB_NAME, TableName=TABLE_NAME, 
PartitionsToDelete=PARTITIONS_TO_DELETE
+        )
+        assert result == []
+
+    @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+    def test_execute_with_not_found_errors(self, mock_conn):
+        """EntityNotFoundException is expected (idempotent delete) and should 
not raise."""
+        mock_client = mock.MagicMock()
+        errors = [
+            {"PartitionValues": ["2024-01-01"], "ErrorDetail": {"ErrorCode": 
"EntityNotFoundException"}}
+        ]
+        mock_client.batch_delete_partition.return_value = {"Errors": errors}
+        mock_conn.return_value = mock_client
+
+        result = self.operator.execute({})
+        assert result == errors
+
+    @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+    def test_execute_with_real_errors(self, mock_conn):
+        """Non-EntityNotFoundException errors should raise RuntimeError."""
+        mock_client = mock.MagicMock()
+        errors = [
+            {"PartitionValues": ["2024-01-01"], "ErrorDetail": {"ErrorCode": 
"InternalServiceException"}}
+        ]
+        mock_client.batch_delete_partition.return_value = {"Errors": errors}
+        mock_conn.return_value = mock_client
+
+        with pytest.raises(RuntimeError, match="Failed to delete 1 partition"):
+            self.operator.execute({})
+
+    def test_template_fields(self):
+        validate_template_fields(self.operator)

Reply via email to