ashb commented on a change in pull request #8895:
URL: https://github.com/apache/airflow/pull/8895#discussion_r426569097



##########
File path: airflow/providers/amazon/aws/hooks/s3.py
##########
@@ -664,6 +664,23 @@ def copy_object(self,
                                                ACL=acl_policy)
         return response
 
+    @provide_bucket_name
+    def delete_bucket(self, bucket_name) -> None:
+        """
+        To delete s3 bucket, delete all s3 bucket objects and then delete the 
bucket.
+
+        :param bucket_name: Bucket name
+        :type bucket_name: str
+        :return: None
+        :rtype: None
+        """
+        bucket_keys = self.list_keys(bucket_name=bucket_name)
+        if bucket_keys:

Review comment:
       No, do not do this by default.
   
   This must be behind a `force_delete` flag.

##########
File path: airflow/providers/amazon/aws/operators/s3_bucket.py
##########
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This module contains AWS S3 operators.
+"""
+from typing import Optional, Union
+
+from botocore.config import Config
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+
+class S3CreateBucketOperator(BaseOperator):
+    """
+    This operator creates an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: Optional[str]
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    :param verify: Whether or not to verify SSL certificates.
+    :type verify: Union[bool, str, None]
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.

Review comment:
       Correction: if not specified whatever the connection is specified to use 
is the behavoiur. 
http://airflow.apache.org/docs/stable/howto/connection/aws.html

##########
File path: airflow/providers/amazon/aws/operators/s3_bucket.py
##########
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This module contains AWS S3 operators.
+"""
+from typing import Optional, Union
+
+from botocore.config import Config
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+
+class S3CreateBucketOperator(BaseOperator):
+    """
+    This operator creates an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: Optional[str]
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    :param verify: Whether or not to verify SSL certificates.
+    :type verify: Union[bool, str, None]
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :type region_name: Optional[str]
+    :param config: Configuration for botocore client.
+        
(https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html)
+    :type config: Optional[botocore.client.Config]
+    """
+    def __init__(self,
+                 bucket_name,
+                 aws_conn_id: Optional[str] = "aws_default",
+                 verify: Union[bool, str, None] = None,
+                 region_name: Optional[str] = None,
+                 config: Optional[Config] = None,
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.bucket_name = bucket_name
+        self.region_name = region_name
+        self.s3_hook = S3Hook(aws_conn_id=aws_conn_id,
+                              verify=verify,
+                              region_name=region_name,
+                              config=config)
+
+    def execute(self, context):
+        if not self.check_for_bucket(self.bucket_name):
+            self.s3_hook.create_bucket(bucket_name=self.bucket_name, 
region_name=self.region_name)
+            self.log.info("Created bucket with name: %s", self.bucket_name)
+        else:
+            self.log.info("The bucket name %s already exists", 
self.bucket_name)
+
+    def check_for_bucket(self, bucket_name: str) -> bool:
+        """
+        Override this method if you want to raise excaption if bucket exists.
+        """
+        if self.s3_hook.check_for_bucket(bucket_name):
+            return True
+        return False
+
+
+class S3DeleteBucketOperator(BaseOperator):
+    """
+    This operator deletes an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: Optional[str]
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    :param verify: Whether or not to verify SSL certificates.
+    :type verify: Union[bool, str, None]
+    :param config: Configuration for botocore client.
+        
(https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html)
+    :type config: Optional[botocore.client.Config]
+    """
+    def __init__(self,
+                 bucket_name,
+                 aws_conn_id: Optional[str] = "aws_default",
+                 verify: Union[bool, str, None] = None,
+                 config: Optional[Config] = None,
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.bucket_name = bucket_name
+        self.s3_hook = S3Hook(aws_conn_id=aws_conn_id,
+                              verify=verify,
+                              config=config)
+
+    def execute(self, context):
+        if not self.check_for_bucket(self.bucket_name):
+            raise AirflowException(f"The bucket name {self.bucket_name} 
doesn't exist")

Review comment:
       If by default, Create doesn't fail when the bucket already exists, this 
should also not fail if the bucket already doesn't exist.
   
   If you think of this operator as ensuring the bucket doesn't exist, then it 
not existing is not fatal.

##########
File path: airflow/providers/amazon/aws/example_dags/example_s3.py
##########
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models.dag import DAG
+from airflow.providers.amazon.aws.operators.s3_bucket import 
S3CreateBucketOperator, S3DeleteBucketOperator
+from airflow.utils.dates import days_ago
+
+DAG_NAME = 'aws_s3_dag'
+default_args = {
+    'owner': 'airflow',
+    'depends_on_past': True,
+    'start_date': days_ago(2)
+}
+
+with DAG(
+    dag_id=DAG_NAME,
+    schedule_interval=None,
+    default_args=default_args
+) as dag:
+
+    create_bucket = S3CreateBucketOperator(
+        task_id='aws_s3_create',
+        bucket_name='test-airflow-12345',
+        region_name='us-east-1',
+    )
+
+    delete_bucket = S3DeleteBucketOperator(

Review comment:
       Since this will delete objects from the bucket, you should put an object 
in to it before deleting.

##########
File path: airflow/providers/amazon/aws/operators/s3_bucket.py
##########
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This module contains AWS S3 operators.
+"""
+from typing import Optional, Union
+
+from botocore.config import Config
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+
+class S3CreateBucketOperator(BaseOperator):
+    """
+    This operator creates an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: Optional[str]
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    :param verify: Whether or not to verify SSL certificates.
+    :type verify: Union[bool, str, None]
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :type region_name: Optional[str]
+    :param config: Configuration for botocore client.
+        
(https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html)
+    :type config: Optional[botocore.client.Config]
+    """
+    def __init__(self,
+                 bucket_name,
+                 aws_conn_id: Optional[str] = "aws_default",
+                 verify: Union[bool, str, None] = None,

Review comment:
       I don't know where you copied this from, but it should be a property of 
the connection, not the operator.
   
   Remove this.

##########
File path: airflow/providers/amazon/aws/example_dags/example_s3.py
##########
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models.dag import DAG
+from airflow.providers.amazon.aws.operators.s3_bucket import 
S3CreateBucketOperator, S3DeleteBucketOperator
+from airflow.utils.dates import days_ago
+
+DAG_NAME = 'aws_s3_dag'

Review comment:
       Should the filename not match this?
   
   Additionally: `s3_bucket_dag`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to