SameerMesiah97 commented on code in PR #66957:
URL: https://github.com/apache/airflow/pull/66957#discussion_r3244981700


##########
providers/amazon/tests/unit/amazon/aws/operators/test_s3_tables.py:
##########
@@ -420,3 +421,23 @@ def test_execute(self, mock_conn):
 
     def test_template_fields(self):
         validate_template_fields(self.operator)
+
+
+class TestS3TablesDeleteTableBucketPolicyOperator:
+    def setup_method(self):
+        self.operator = S3TablesDeleteTableBucketPolicyOperator(
+            task_id="delete_policy",
+            table_bucket_arn=TABLE_BUCKET_ARN,
+        )
+
+    @mock.patch.object(S3TablesHook, "conn", new_callable=mock.PropertyMock)
+    def test_execute(self, mock_conn):
+        mock_client = mock.MagicMock()
+        mock_conn.return_value = mock_client
+
+        self.operator.execute({})
+
+        
mock_client.delete_table_bucket_policy.assert_called_once_with(tableBucketARN=TABLE_BUCKET_ARN)

Review Comment:
   I would add this test to cover error propagation in case we receive errors 
from the API side:
   
   ```
   @mock.patch.object(S3TablesHook, "conn", new_callable=mock.PropertyMock)
   def test_execute_propagates_client_error(self, mock_conn):
       mock_client = mock.MagicMock()
       mock_client.delete_table_bucket_policy.side_effect = Exception("boom")
       mock_conn.return_value = mock_client
   
       with pytest.raises(Exception, match="boom"):
           self.operator.execute({})
   ```



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/s3_tables.py:
##########
@@ -419,3 +419,32 @@ def execute(self, context: Context) -> None:
             resourcePolicy=self.resource_policy,
         )
         self.log.info("Policy set on table bucket %s", self.table_bucket_arn)
+
+
+class S3TablesDeleteTableBucketPolicyOperator(AwsBaseOperator[S3TablesHook]):
+    """
+    Delete the resource policy from an Amazon S3 Tables table bucket.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:S3TablesDeleteTableBucketPolicyOperator`
+
+    :param table_bucket_arn: The ARN of the table bucket. (templated)
+    """
+
+    aws_hook_class = S3TablesHook
+    template_fields: Sequence[str] = aws_template_fields("table_bucket_arn")
+
+    def __init__(
+        self,
+        *,
+        table_bucket_arn: str,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.table_bucket_arn = table_bucket_arn
+
+    def execute(self, context: Context) -> None:
+        self.log.info("Deleting policy from table bucket %s", 
self.table_bucket_arn)
+        
self.hook.conn.delete_table_bucket_policy(tableBucketARN=self.table_bucket_arn)

Review Comment:
   Do we know what the underlying S3 Tables API returns when the table bucket 
policy has already been deleted or the ARN does not exist? Just wondering 
whether this operation is effectively idempotent, since concurrent DAG runs 
could potentially attempt to delete the same policy.
   
   This is a non-blocking concern. I am just curious here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to