This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d75c079f0a9 Add `S3TablesPutTableBucketPolicyOperator` (#66720)
d75c079f0a9 is described below
commit d75c079f0a957167707b08b09d238f49bf5551f3
Author: John Jackson <[email protected]>
AuthorDate: Tue May 12 07:09:05 2026 -0700
Add `S3TablesPutTableBucketPolicyOperator` (#66720)
---
providers/amazon/docs/operators/s3_tables.rst | 15 +++++++++
.../providers/amazon/aws/operators/s3_tables.py | 36 ++++++++++++++++++++++
.../tests/system/amazon/aws/example_s3_tables.py | 11 +++++++
.../unit/amazon/aws/operators/test_s3_tables.py | 28 +++++++++++++++++
4 files changed, 90 insertions(+)
diff --git a/providers/amazon/docs/operators/s3_tables.rst
b/providers/amazon/docs/operators/s3_tables.rst
index 7f0b6de7dff..d0e1320f7d7 100644
--- a/providers/amazon/docs/operators/s3_tables.rst
+++ b/providers/amazon/docs/operators/s3_tables.rst
@@ -117,6 +117,21 @@ To rename a table in an Amazon S3 Tables namespace, use
:start-after: [START howto_operator_s3tables_rename_table]
:end-before: [END howto_operator_s3tables_rename_table]
+
+.. _howto/operator:S3TablesPutTableBucketPolicyOperator:
+
+Put a Table Bucket Policy
+-------------------------
+
+To set a resource policy on an Amazon S3 Tables table bucket, use
+:class:`~airflow.providers.amazon.aws.operators.s3_tables.S3TablesPutTableBucketPolicyOperator`.
+
+.. exampleinclude:: /../../amazon/tests/system/amazon/aws/example_s3_tables.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_s3tables_put_table_bucket_policy]
+ :end-before: [END howto_operator_s3tables_put_table_bucket_policy]
+
Reference
---------
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_tables.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_tables.py
index 4b2b81572a3..ace71abfc3b 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_tables.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_tables.py
@@ -383,3 +383,39 @@ class
S3TablesRenameTableOperator(AwsBaseOperator[S3TablesHook]):
)
self.hook.conn.rename_table(**kwargs)
self.log.info("Renamed table %s to %s", self.table_name, self.new_name)
+
+
+class S3TablesPutTableBucketPolicyOperator(AwsBaseOperator[S3TablesHook]):
+ """
+ Set a resource policy on an Amazon S3 Tables table bucket.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:S3TablesPutTableBucketPolicyOperator`
+
+ :param table_bucket_arn: The ARN of the table bucket. (templated)
+ :param resource_policy: The JSON resource policy string. (templated)
+ """
+
+ aws_hook_class = S3TablesHook
+ template_fields: Sequence[str] = aws_template_fields("table_bucket_arn",
"resource_policy")
+ template_fields_renderers = {"resource_policy": "json"}
+
+ def __init__(
+ self,
+ *,
+ table_bucket_arn: str,
+ resource_policy: str,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.table_bucket_arn = table_bucket_arn
+ self.resource_policy = resource_policy
+
+ def execute(self, context: Context) -> None:
+ self.log.info("Setting policy on table bucket %s",
self.table_bucket_arn)
+ self.hook.conn.put_table_bucket_policy(
+ tableBucketARN=self.table_bucket_arn,
+ resourcePolicy=self.resource_policy,
+ )
+ self.log.info("Policy set on table bucket %s", self.table_bucket_arn)
diff --git a/providers/amazon/tests/system/amazon/aws/example_s3_tables.py
b/providers/amazon/tests/system/amazon/aws/example_s3_tables.py
index 433e50e2fa0..9c53da23087 100644
--- a/providers/amazon/tests/system/amazon/aws/example_s3_tables.py
+++ b/providers/amazon/tests/system/amazon/aws/example_s3_tables.py
@@ -25,6 +25,7 @@ from airflow.providers.amazon.aws.operators.s3_tables import (
S3TablesDeleteNamespaceOperator,
S3TablesDeleteTableBucketOperator,
S3TablesDeleteTableOperator,
+ S3TablesPutTableBucketPolicyOperator,
S3TablesRenameTableOperator,
)
from airflow.providers.common.compat.sdk import DAG, chain
@@ -81,6 +82,15 @@ with DAG(
table_bucket_name=bucket_name,
)
# [END howto_operator_s3tables_create_table_bucket]
+
+ # [START howto_operator_s3tables_put_table_bucket_policy]
+ put_policy = S3TablesPutTableBucketPolicyOperator(
+ task_id="put_table_bucket_policy",
+ table_bucket_arn=create_table_bucket.output,
+ resource_policy='{"Version":"2012-10-17","Statement":[]}',
+ )
+ # [END howto_operator_s3tables_put_table_bucket_policy]
+
# [START howto_operator_s3tables_create_namespace]
setup_namespace = S3TablesCreateNamespaceOperator(
task_id="create_namespace",
@@ -140,6 +150,7 @@ with DAG(
# TEST SETUP
test_context,
create_table_bucket,
+ put_policy,
setup_namespace,
# TEST BODY
create_table,
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_s3_tables.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_s3_tables.py
index 47a51cec4ab..c6568594293 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_s3_tables.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_s3_tables.py
@@ -31,6 +31,7 @@ from airflow.providers.amazon.aws.operators.s3_tables import (
S3TablesDeleteNamespaceOperator,
S3TablesDeleteTableBucketOperator,
S3TablesDeleteTableOperator,
+ S3TablesPutTableBucketPolicyOperator,
S3TablesRenameTableOperator,
)
@@ -392,3 +393,30 @@ class TestS3TablesRenameTableOperator:
def test_template_fields(self):
validate_template_fields(self.operator)
+
+
+POLICY =
'{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":"*","Action":"s3tables:*","Resource":"*"}]}'
+
+
+class TestS3TablesPutTableBucketPolicyOperator:
+ def setup_method(self):
+ self.operator = S3TablesPutTableBucketPolicyOperator(
+ task_id="put_policy",
+ table_bucket_arn=TABLE_BUCKET_ARN,
+ resource_policy=POLICY,
+ )
+
+ @mock.patch.object(S3TablesHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_conn.return_value = mock_client
+
+ self.operator.execute({})
+
+ mock_client.put_table_bucket_policy.assert_called_once_with(
+ tableBucketARN=TABLE_BUCKET_ARN,
+ resourcePolicy=POLICY,
+ )
+
+ def test_template_fields(self):
+ validate_template_fields(self.operator)