ashb commented on a change in pull request #8895:
URL: https://github.com/apache/airflow/pull/8895#discussion_r429648649



##########
File path: airflow/providers/amazon/aws/hooks/s3.py
##########
@@ -664,6 +664,26 @@ def copy_object(self,
                                                ACL=acl_policy)
         return response
 
+    @provide_bucket_name
+    def delete_bucket(self, bucket_name: str, force_delete: bool = False) -> 
None:
+        """
+        To delete s3 bucket, delete all s3 bucket objects and then delete the 
bucket.
+
+        :param bucket_name: Bucket name
+        :type bucket_name: str
+        :param force_delete: Enable this to delete bucket even if not empty
+        :type force_delete: bool
+        :return: None
+        :rtype: None
+        """
+        if force_delete:
+            bucket_keys = self.list_keys(bucket_name=bucket_name)
+            if bucket_keys:
+                self.delete_objects(bucket=bucket_name, keys=bucket_keys)
+        self.get_conn().delete_bucket(

Review comment:
       ```suggestion
           self.conn.delete_bucket(
   ```
   
   I think

##########
File path: airflow/providers/amazon/aws/operators/s3_bucket.py
##########
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This module contains AWS S3 operators.
+"""
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+
+class S3CreateBucketOperator(BaseOperator):
+    """
+    This operator creates an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    :param region_name: AWS region_name. If not specified fetched from 
connection.
+    :type region_name: Optional[str]
+    """
+    def __init__(self,
+                 bucket_name,
+                 aws_conn_id: Optional[str] = "aws_default",
+                 region_name: Optional[str] = None,
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.bucket_name = bucket_name
+        self.region_name = region_name
+        self.s3_hook = S3Hook(aws_conn_id=aws_conn_id, region_name=region_name)

Review comment:
       Do not create the hook in the operator constructor, only inside execute. 
Instead store these params on self here

##########
File path: airflow/providers/amazon/aws/hooks/s3.py
##########
@@ -664,6 +664,26 @@ def copy_object(self,
                                                ACL=acl_policy)
         return response
 
+    @provide_bucket_name
+    def delete_bucket(self, bucket_name: str, force_delete: bool = False) -> 
None:
+        """
+        To delete s3 bucket, delete all s3 bucket objects and then delete the 
bucket.
+
+        :param bucket_name: Bucket name
+        :type bucket_name: str
+        :param force_delete: Enable this to delete bucket even if not empty

Review comment:
       ```suggestion
           :param force_delete: Forcibly delete all objects in the bucket 
before deleting the bucket
   ```

##########
File path: airflow/providers/amazon/aws/operators/s3_bucket.py
##########
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This module contains AWS S3 operators.
+"""
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+
+class S3CreateBucketOperator(BaseOperator):
+    """
+    This operator creates an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    :param region_name: AWS region_name. If not specified fetched from 
connection.
+    :type region_name: Optional[str]
+    """
+    def __init__(self,
+                 bucket_name,
+                 aws_conn_id: Optional[str] = "aws_default",
+                 region_name: Optional[str] = None,
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.bucket_name = bucket_name
+        self.region_name = region_name
+        self.s3_hook = S3Hook(aws_conn_id=aws_conn_id, region_name=region_name)
+
+    def execute(self, context):
+        self.check_for_bucket(self.bucket_name)
+        self.s3_hook.create_bucket(bucket_name=self.bucket_name, 
region_name=self.region_name)
+        self.log.info("Created bucket with name: %s", self.bucket_name)
+
+    def check_for_bucket(self, bucket_name: str) -> None:
+        """
+        Exception is raised if bucket already exists.
+
+        :param bucket_name: Bucket name
+        :type bucket_name: str
+        """
+        if self.s3_hook.check_for_bucket(bucket_name):
+            raise AirflowException(f"The bucket name {bucket_name} already 
exists")
+
+
+class S3DeleteBucketOperator(BaseOperator):
+    """
+    This operator deletes an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: str
+    :param force_delete: Enable this to delete bucket even if not empty
+    :type force_delete: bool
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    """
+    def __init__(self,
+                 bucket_name,
+                 force_delete: Optional[bool] = False,
+                 aws_conn_id: Optional[str] = "aws_default",
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.bucket_name = bucket_name
+        self.force_delete = force_delete
+        self.s3_hook = S3Hook(aws_conn_id=aws_conn_id)
+
+    def execute(self, context):
+        self.check_for_bucket(self.bucket_name)
+        self.s3_hook.delete_bucket(bucket_name=self.bucket_name, 
force_delete=self.force_delete)
+        self.log.info("Deleted bucket with name: %s", self.bucket_name)
+
+    def check_for_bucket(self, bucket_name: str) -> None:
+        """
+        An exception is raised if bucket doesn't exist.
+
+        :param bucket_name: Bucket name
+        :type bucket_name: str
+        """
+        if not self.s3_hook.check_for_bucket(bucket_name):

Review comment:
       Same here

##########
File path: airflow/providers/amazon/aws/operators/s3_bucket.py
##########
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This module contains AWS S3 operators.
+"""
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+
+class S3CreateBucketOperator(BaseOperator):
+    """
+    This operator creates an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    :param region_name: AWS region_name. If not specified fetched from 
connection.
+    :type region_name: Optional[str]
+    """
+    def __init__(self,
+                 bucket_name,
+                 aws_conn_id: Optional[str] = "aws_default",
+                 region_name: Optional[str] = None,
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.bucket_name = bucket_name
+        self.region_name = region_name
+        self.s3_hook = S3Hook(aws_conn_id=aws_conn_id, region_name=region_name)
+
+    def execute(self, context):
+        self.check_for_bucket(self.bucket_name)
+        self.s3_hook.create_bucket(bucket_name=self.bucket_name, 
region_name=self.region_name)
+        self.log.info("Created bucket with name: %s", self.bucket_name)
+
+    def check_for_bucket(self, bucket_name: str) -> None:
+        """
+        Exception is raised if bucket already exists.
+
+        :param bucket_name: Bucket name
+        :type bucket_name: str
+        """
+        if self.s3_hook.check_for_bucket(bucket_name):
+            raise AirflowException(f"The bucket name {bucket_name} already 
exists")

Review comment:
       I do not think Create Bucket should fail when the bucket already exists. 
In this case the operator has done it's desired job.

##########
File path: airflow/providers/amazon/aws/operators/s3_bucket.py
##########
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This module contains AWS S3 operators.
+"""
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+
+class S3CreateBucketOperator(BaseOperator):
+    """
+    This operator creates an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    :param region_name: AWS region_name. If not specified fetched from 
connection.
+    :type region_name: Optional[str]
+    """
+    def __init__(self,
+                 bucket_name,
+                 aws_conn_id: Optional[str] = "aws_default",
+                 region_name: Optional[str] = None,
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.bucket_name = bucket_name
+        self.region_name = region_name
+        self.s3_hook = S3Hook(aws_conn_id=aws_conn_id, region_name=region_name)
+
+    def execute(self, context):
+        self.check_for_bucket(self.bucket_name)
+        self.s3_hook.create_bucket(bucket_name=self.bucket_name, 
region_name=self.region_name)
+        self.log.info("Created bucket with name: %s", self.bucket_name)
+
+    def check_for_bucket(self, bucket_name: str) -> None:
+        """
+        Exception is raised if bucket already exists.
+
+        :param bucket_name: Bucket name
+        :type bucket_name: str
+        """
+        if self.s3_hook.check_for_bucket(bucket_name):
+            raise AirflowException(f"The bucket name {bucket_name} already 
exists")
+
+
+class S3DeleteBucketOperator(BaseOperator):
+    """
+    This operator deletes an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: str
+    :param force_delete: Enable this to delete bucket even if not empty
+    :type force_delete: bool
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    """
+    def __init__(self,
+                 bucket_name,
+                 force_delete: Optional[bool] = False,
+                 aws_conn_id: Optional[str] = "aws_default",
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.bucket_name = bucket_name
+        self.force_delete = force_delete
+        self.s3_hook = S3Hook(aws_conn_id=aws_conn_id)

Review comment:
       Same here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to