This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 76c93053290 Add `GlueCatalogBatchDeletePartitionOperator` (#66721)
76c93053290 is described below
commit 76c930532905c88b3af0c13c7493105f7337ac4e
Author: John Jackson <[email protected]>
AuthorDate: Tue Jun 2 08:21:23 2026 -0700
Add `GlueCatalogBatchDeletePartitionOperator` (#66721)
---
providers/amazon/docs/operators/glue_catalog.rst | 14 +++++
.../providers/amazon/aws/operators/glue_catalog.py | 68 ++++++++++++++++++++++
.../system/amazon/aws/example_glue_catalog.py | 12 ++++
.../unit/amazon/aws/operators/test_glue_catalog.py | 56 ++++++++++++++++++
4 files changed, 150 insertions(+)
diff --git a/providers/amazon/docs/operators/glue_catalog.rst
b/providers/amazon/docs/operators/glue_catalog.rst
index e8777727940..8fe4a8224ff 100644
--- a/providers/amazon/docs/operators/glue_catalog.rst
+++ b/providers/amazon/docs/operators/glue_catalog.rst
@@ -96,3 +96,17 @@ To create a partition in an AWS Glue Data Catalog table, use
:dedent: 4
:start-after: [START howto_operator_glue_catalog_create_partition]
:end-before: [END howto_operator_glue_catalog_create_partition]
+
+.. _howto/operator:GlueCatalogBatchDeletePartitionOperator:
+
+Batch Delete Partitions
+-----------------------
+
+To delete one or more partitions from an AWS Glue Data Catalog table, use
+:class:`~airflow.providers.amazon.aws.operators.glue_catalog.GlueCatalogBatchDeletePartitionOperator`.
+
+.. exampleinclude::
/../../amazon/tests/system/amazon/aws/example_glue_catalog.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_glue_catalog_batch_delete_partition]
+ :end-before: [END howto_operator_glue_catalog_batch_delete_partition]
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_catalog.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_catalog.py
index 76141a4d8a4..296f6214e99 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_catalog.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_catalog.py
@@ -24,6 +24,7 @@ from botocore.exceptions import ClientError
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
+from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
from airflow.utils.helpers import prune_dict
if TYPE_CHECKING:
@@ -336,3 +337,70 @@ class
GlueCatalogCreatePartitionOperator(AwsBaseOperator[AwsBaseHook]):
else:
raise
self.log.info("Partition created.")
+
+
+class GlueCatalogBatchDeletePartitionOperator(AwsBaseOperator[AwsBaseHook]):
+ """
+ Delete one or more partitions from an AWS Glue Data Catalog table.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GlueCatalogBatchDeletePartitionOperator`
+
+ :param database_name: The name of the database. (templated)
+ :param table_name: The name of the table. (templated)
+ :param partitions_to_delete: List of partition value dicts to delete.
(templated)
+ :param catalog_id: The ID of the Data Catalog. Defaults to the account ID.
(templated)
+ """
+
+ aws_hook_class = AwsBaseHook
+ template_fields: tuple[str, ...] = aws_template_fields(
+ "database_name", "table_name", "catalog_id", "partitions_to_delete"
+ )
+
+ def __init__(
+ self,
+ *,
+ database_name: str,
+ table_name: str,
+ partitions_to_delete: list[dict[str, list[str]]],
+ catalog_id: str | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.database_name = database_name
+ self.table_name = table_name
+ self.partitions_to_delete = partitions_to_delete
+ self.catalog_id = catalog_id
+
+ @property
+ def _hook_parameters(self) -> dict[str, Any]:
+ return {**super()._hook_parameters, "client_type": "glue"}
+
+ def execute(self, context: Context) -> list[dict[str, Any]]:
+ self.log.info(
+ "Deleting %d partitions from %s.%s",
+ len(self.partitions_to_delete),
+ self.database_name,
+ self.table_name,
+ )
+ kwargs: dict[str, Any] = prune_dict(
+ {
+ "DatabaseName": self.database_name,
+ "TableName": self.table_name,
+ "PartitionsToDelete": self.partitions_to_delete,
+ "CatalogId": self.catalog_id,
+ }
+ )
+ response = self.hook.conn.batch_delete_partition(**kwargs)
+ errors = response.get("Errors", [])
+ if errors:
+ # EntityNotFoundException is expected for idempotent deletes
+ real_errors = [
+ e for e in errors if e.get("ErrorDetail", {}).get("ErrorCode")
!= "EntityNotFoundException"
+ ]
+ if real_errors:
+ raise RuntimeError(f"Failed to delete {len(real_errors)}
partition(s): {real_errors}")
+ self.log.info("Some partitions not found (already deleted),
continuing.")
+ self.log.info("Batch delete partitions complete.")
+ return errors
diff --git a/providers/amazon/tests/system/amazon/aws/example_glue_catalog.py
b/providers/amazon/tests/system/amazon/aws/example_glue_catalog.py
index 6eb736e1eed..7eed9c8df10 100644
--- a/providers/amazon/tests/system/amazon/aws/example_glue_catalog.py
+++ b/providers/amazon/tests/system/amazon/aws/example_glue_catalog.py
@@ -19,6 +19,7 @@ from __future__ import annotations
from datetime import datetime
from airflow.providers.amazon.aws.operators.glue_catalog import (
+ GlueCatalogBatchDeletePartitionOperator,
GlueCatalogCreateDatabaseOperator,
GlueCatalogCreatePartitionOperator,
GlueCatalogCreateTableOperator,
@@ -106,6 +107,16 @@ with DAG(
)
# [END howto_operator_glue_catalog_create_partition]
+ # [START howto_operator_glue_catalog_batch_delete_partition]
+ batch_delete_partition = GlueCatalogBatchDeletePartitionOperator(
+ task_id="batch_delete_partition",
+ database_name=db_name,
+ table_name=table_name,
+ partitions_to_delete=[{"Values": ["2024-01-01"]}],
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ # [END howto_operator_glue_catalog_batch_delete_partition]
+
# [START howto_operator_glue_catalog_delete_table]
delete_table = GlueCatalogDeleteTableOperator(
task_id="delete_table",
@@ -120,6 +131,7 @@ with DAG(
create_database,
create_table,
create_partition,
+ batch_delete_partition,
delete_table,
delete_database,
)
diff --git
a/providers/amazon/tests/unit/amazon/aws/operators/test_glue_catalog.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_glue_catalog.py
index 6d40e0f36eb..ef9e4bdf498 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_glue_catalog.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_glue_catalog.py
@@ -24,6 +24,7 @@ from botocore.exceptions import ClientError
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.operators.glue_catalog import (
+ GlueCatalogBatchDeletePartitionOperator,
GlueCatalogCreateDatabaseOperator,
GlueCatalogCreatePartitionOperator,
GlueCatalogCreateTableOperator,
@@ -264,3 +265,58 @@ class TestGlueCatalogCreatePartitionOperator:
def test_template_fields(self):
validate_template_fields(self.operator)
+
+
+PARTITIONS_TO_DELETE = [{"Values": ["2024-01-01"]}, {"Values": ["2024-01-02"]}]
+
+
+class TestGlueCatalogBatchDeletePartitionOperator:
+ def setup_method(self):
+ self.operator = GlueCatalogBatchDeletePartitionOperator(
+ task_id="batch_delete_partition",
+ database_name=DB_NAME,
+ table_name=TABLE_NAME,
+ partitions_to_delete=PARTITIONS_TO_DELETE,
+ )
+
+ @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_client.batch_delete_partition.return_value = {"Errors": []}
+ mock_conn.return_value = mock_client
+
+ result = self.operator.execute({})
+
+ mock_client.batch_delete_partition.assert_called_once_with(
+ DatabaseName=DB_NAME, TableName=TABLE_NAME,
PartitionsToDelete=PARTITIONS_TO_DELETE
+ )
+ assert result == []
+
+ @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute_with_not_found_errors(self, mock_conn):
+ """EntityNotFoundException is expected (idempotent delete) and should
not raise."""
+ mock_client = mock.MagicMock()
+ errors = [
+ {"PartitionValues": ["2024-01-01"], "ErrorDetail": {"ErrorCode":
"EntityNotFoundException"}}
+ ]
+ mock_client.batch_delete_partition.return_value = {"Errors": errors}
+ mock_conn.return_value = mock_client
+
+ result = self.operator.execute({})
+ assert result == errors
+
+ @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute_with_real_errors(self, mock_conn):
+ """Non-EntityNotFoundException errors should raise RuntimeError."""
+ mock_client = mock.MagicMock()
+ errors = [
+ {"PartitionValues": ["2024-01-01"], "ErrorDetail": {"ErrorCode":
"InternalServiceException"}}
+ ]
+ mock_client.batch_delete_partition.return_value = {"Errors": errors}
+ mock_conn.return_value = mock_client
+
+ with pytest.raises(RuntimeError, match="Failed to delete 1 partition"):
+ self.operator.execute({})
+
+ def test_template_fields(self):
+ validate_template_fields(self.operator)