dstandish commented on a change in pull request #13072:
URL: https://github.com/apache/airflow/pull/13072#discussion_r548396143



##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,181 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param poll_interval: Time (in seconds) to wait between two consecutive 
calls to check crawler status
+    :type poll_interval: int
+    :param config = Configurations for the AWS Glue crawler
+    :type config = Optional[dict]
+    """
+
+    def __init__(self, poll_interval, *args, **kwargs):
+        self.poll_interval = poll_interval
+        kwargs['client_type'] = 'glue'
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def glue_client(self):
+        """:return: AWS Glue client"""
+        return self.get_conn()
+
+    def get_iam_execution_role(self, role_name) -> str:
+        """:return: iam role for crawler execution"""
+        iam_client = self.get_client_type('iam', self.region_name)
+
+        try:
+            glue_execution_role = iam_client.get_role(RoleName=role_name)
+            self.log.info("Iam Role Name: %s", role_name)
+            return glue_execution_role
+        except Exception as general_error:
+            self.log.error("Failed to create aws glue crawler, error: %s", 
general_error)
+            raise
+
+    def get_or_create_crawler(self, config) -> str:
+        """
+        Creates the crawler if the crawler doesn't exists and returns the 
crawler name
+
+        :param config = Configurations for the AWS Glue crawler
+        :type config = Optional[dict]
+        :return: Name of the crawler
+        """
+        self.get_iam_execution_role(config["Role"])
+
+        try:
+            self.glue_client.get_crawler(**config)
+            self.log.info("Crawler already exists")
+            try:
+                self.glue_client.update_crawler(**config)
+                return config["Name"]
+            except Exception as general_error:
+                self.log.error("Failed to update aws glue crawler, error: %s", 
general_error)
+                raise
+        except self.glue_client.exceptions.EntityNotFoundException:
+            self.log.info("Creating AWS Glue crawler")
+            try:
+                self.glue_client.create_crawler(**config)
+                return config["Name"]
+            except Exception as general_error:
+                self.log.error("Failed to create aws glue crawler, error: %s", 
general_error)
+                raise

Review comment:
       The nested trys get a bit hard to decipher.  
   
   ```suggestion
           crawler_name = config["Name"]
           try:
               self.glue_client.get_crawler(Name=crawler_name)
               self.log.info(f"Crawler '{crawler_name}' already exists; 
updating crawler.")
               self.glue_client.update_crawler(**config)
           except self.glue_client.exceptions.EntityNotFoundException:
               self.log.info("Creating AWS Glue crawler")
               self.glue_client.create_crawler(**config)
           return crawler_name
   ```
   
   I think it's better to remove the try except when you are just catching and 
immediately raising because it impairs readability without benefit --- it will 
be clear enough in the logs that the operation failed.

##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,181 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param poll_interval: Time (in seconds) to wait between two consecutive 
calls to check crawler status
+    :type poll_interval: int
+    :param config = Configurations for the AWS Glue crawler
+    :type config = Optional[dict]
+    """
+
+    def __init__(self, poll_interval, *args, **kwargs):
+        self.poll_interval = poll_interval
+        kwargs['client_type'] = 'glue'
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def glue_client(self):
+        """:return: AWS Glue client"""
+        return self.get_conn()
+
+    def get_iam_execution_role(self, role_name) -> str:
+        """:return: iam role for crawler execution"""
+        iam_client = self.get_client_type('iam', self.region_name)
+
+        try:
+            glue_execution_role = iam_client.get_role(RoleName=role_name)
+            self.log.info("Iam Role Name: %s", role_name)
+            return glue_execution_role
+        except Exception as general_error:
+            self.log.error("Failed to create aws glue crawler, error: %s", 
general_error)

Review comment:
       message not correct for this method... but also ... to be a broken 
record :) ... not sure necessary to catch here

##########
File path: tests/providers/amazon/aws/hooks/test_glue_crawler.py
##########
@@ -0,0 +1,142 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+import unittest
+from unittest import mock
+
+from cached_property import cached_property
+
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+
+try:
+    from moto import mock_iam
+except ImportError:
+    mock_iam = None
+
+mock_crawler_name = 'test-crawler'
+mock_role_name = 'test-role'
+mock_config = {
+    'Name': mock_crawler_name,
+    'Description': 'Test glue crawler from Airflow',
+    'DatabaseName': 'test_db',
+    'Role': mock_role_name,
+    'Targets': {
+        'S3Targets': [
+            {
+                'Path': 's3://test-glue-crawler/foo/',
+                'Exclusions': [
+                    's3://test-glue-crawler/bar/',
+                ],
+                'ConnectionName': 'test-s3-conn',
+            }
+        ],
+        'JdbcTargets': [
+            {
+                'ConnectionName': 'test-jdbc-conn',
+                'Path': 'test_db/test_table>',
+                'Exclusions': [
+                    'string',
+                ],
+            }
+        ],
+        'MongoDBTargets': [
+            {'ConnectionName': 'test-mongo-conn', 'Path': 
'test_db/test_collection', 'ScanAll': True}
+        ],
+        'DynamoDBTargets': [{'Path': 'test_db/test_table', 'scanAll': True, 
'scanRate': 123.0}],
+        'CatalogTargets': [
+            {
+                'DatabaseName': 'test_glue_db',
+                'Tables': [
+                    'test',
+                ],
+            }
+        ],
+    },
+    'Classifiers': ['test-classifier'],
+    'TablePrefix': 'test',
+    'SchemaChangePolicy': {
+        'UpdateBehavior': 'UPDATE_IN_DATABASE',
+        'DeleteBehavior': 'DEPRECATE_IN_DATABASE',
+    },
+    'RecrawlPolicy': {'RecrawlBehavior': 'CRAWL_EVERYTHING'},
+    'LineageConfiguration': 'ENABLE',
+    'Configuration': """
+    {
+        "Version": 1.0,
+        "CrawlerOutput": {
+            "Partitions": { "AddOrUpdateBehavior": "InheritFromTable" }
+        }
+    }
+    """,
+    'SecurityConfiguration': 'test',
+    'Tags': {'test': 'foo'},
+}
+
+
+class TestAwsGlueCrawlerHook(unittest.TestCase):
+    @cached_property
+    def setUp(self):
+        self.hook = AwsGlueCrawlerHook(aws_conn_id="aws_default", 
poll_interval=5)
+
+    @unittest.skipIf(mock_iam is None, 'mock_iam package not present')
+    @mock_iam
+    def test_get_iam_execution_role(self):
+        iam_role = self.hook.get_client_type('iam').create_role(
+            Path="/",
+            RoleName=mock_role_name,
+            AssumeRolePolicyDocument=json.dumps(
+                {
+                    "Version": "2012-10-17",
+                    "Statement": {
+                        "Effect": "Allow",
+                        "Principal": {"Service": "glue.amazonaws.com"},
+                        "Action": "sts:AssumeRole",
+                    },
+                }
+            ),
+        )
+        iam_role = self.hook.get_iam_execution_role(role_name=mock_role_name)
+
+        self.assertIsNotNone(iam_role)
+
+    @mock.patch.object(AwsGlueCrawlerHook, "get_iam_execution_role")
+    @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+    def test_get_or_create_crawler(self, mock_get_conn, 
mock_get_iam_execution_role):
+        mock_get_iam_execution_role.return_value = 
mock.MagicMock(Role={'RoleName': mock_role_name})

Review comment:
       does this have any effect?

##########
File path: tests/providers/amazon/aws/hooks/test_glue_crawler.py
##########
@@ -0,0 +1,142 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+import unittest
+from unittest import mock
+
+from cached_property import cached_property
+
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+
+try:
+    from moto import mock_iam
+except ImportError:
+    mock_iam = None
+
+mock_crawler_name = 'test-crawler'
+mock_role_name = 'test-role'
+mock_config = {
+    'Name': mock_crawler_name,
+    'Description': 'Test glue crawler from Airflow',
+    'DatabaseName': 'test_db',
+    'Role': mock_role_name,
+    'Targets': {
+        'S3Targets': [
+            {
+                'Path': 's3://test-glue-crawler/foo/',
+                'Exclusions': [
+                    's3://test-glue-crawler/bar/',
+                ],
+                'ConnectionName': 'test-s3-conn',
+            }
+        ],
+        'JdbcTargets': [
+            {
+                'ConnectionName': 'test-jdbc-conn',
+                'Path': 'test_db/test_table>',
+                'Exclusions': [
+                    'string',
+                ],
+            }
+        ],
+        'MongoDBTargets': [
+            {'ConnectionName': 'test-mongo-conn', 'Path': 
'test_db/test_collection', 'ScanAll': True}
+        ],
+        'DynamoDBTargets': [{'Path': 'test_db/test_table', 'scanAll': True, 
'scanRate': 123.0}],
+        'CatalogTargets': [
+            {
+                'DatabaseName': 'test_glue_db',
+                'Tables': [
+                    'test',
+                ],
+            }
+        ],
+    },
+    'Classifiers': ['test-classifier'],
+    'TablePrefix': 'test',
+    'SchemaChangePolicy': {
+        'UpdateBehavior': 'UPDATE_IN_DATABASE',
+        'DeleteBehavior': 'DEPRECATE_IN_DATABASE',
+    },
+    'RecrawlPolicy': {'RecrawlBehavior': 'CRAWL_EVERYTHING'},
+    'LineageConfiguration': 'ENABLE',
+    'Configuration': """
+    {
+        "Version": 1.0,
+        "CrawlerOutput": {
+            "Partitions": { "AddOrUpdateBehavior": "InheritFromTable" }
+        }
+    }
+    """,
+    'SecurityConfiguration': 'test',
+    'Tags': {'test': 'foo'},
+}
+
+
+class TestAwsGlueCrawlerHook(unittest.TestCase):
+    @cached_property
+    def setUp(self):
+        self.hook = AwsGlueCrawlerHook(aws_conn_id="aws_default", 
poll_interval=5)
+
+    @unittest.skipIf(mock_iam is None, 'mock_iam package not present')
+    @mock_iam
+    def test_get_iam_execution_role(self):
+        iam_role = self.hook.get_client_type('iam').create_role(
+            Path="/",
+            RoleName=mock_role_name,
+            AssumeRolePolicyDocument=json.dumps(
+                {
+                    "Version": "2012-10-17",
+                    "Statement": {
+                        "Effect": "Allow",
+                        "Principal": {"Service": "glue.amazonaws.com"},
+                        "Action": "sts:AssumeRole",
+                    },
+                }
+            ),
+        )
+        iam_role = self.hook.get_iam_execution_role(role_name=mock_role_name)
+
+        self.assertIsNotNone(iam_role)

Review comment:
       perhaps it would be just as easy to verify that the role you mocked is 
the role retrieved (which is slightly better than just confirming that the 
value is not None)

##########
File path: airflow/providers/amazon/aws/operators/glue_crawler.py
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AwsGlueCrawlerOperator(BaseOperator):
+    """
+    Creates, updates and triggers an AWS Glue Crawler. AWS Glue Crawler is a 
serverless
+    service that manages a catalog of metadata tables that contain the inferred
+    schema, format and data types of data stores within the AWS cloud.
+
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param poll_interval: Time (in seconds) to wait between two consecutive 
calls to check crawler status
+    :type poll_interval: int
+    :param config = Configurations for the AWS Glue crawler
+    :type config = Optional[dict]

Review comment:
       not optional

##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,181 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param poll_interval: Time (in seconds) to wait between two consecutive 
calls to check crawler status
+    :type poll_interval: int
+    :param config = Configurations for the AWS Glue crawler
+    :type config = Optional[dict]
+    """
+
+    def __init__(self, poll_interval, *args, **kwargs):
+        self.poll_interval = poll_interval
+        kwargs['client_type'] = 'glue'
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def glue_client(self):
+        """:return: AWS Glue client"""
+        return self.get_conn()
+
+    def get_iam_execution_role(self, role_name) -> str:
+        """:return: iam role for crawler execution"""
+        iam_client = self.get_client_type('iam', self.region_name)
+
+        try:
+            glue_execution_role = iam_client.get_role(RoleName=role_name)
+            self.log.info("Iam Role Name: %s", role_name)
+            return glue_execution_role
+        except Exception as general_error:
+            self.log.error("Failed to create aws glue crawler, error: %s", 
general_error)
+            raise
+
+    def get_or_create_crawler(self, config) -> str:
+        """
+        Creates the crawler if the crawler doesn't exists and returns the 
crawler name
+
+        :param config = Configurations for the AWS Glue crawler
+        :type config = Optional[dict]
+        :return: Name of the crawler
+        """
+        self.get_iam_execution_role(config["Role"])
+
+        try:
+            self.glue_client.get_crawler(**config)
+            self.log.info("Crawler already exists")
+            try:
+                self.glue_client.update_crawler(**config)
+                return config["Name"]
+            except Exception as general_error:
+                self.log.error("Failed to update aws glue crawler, error: %s", 
general_error)
+                raise
+        except self.glue_client.exceptions.EntityNotFoundException:
+            self.log.info("Creating AWS Glue crawler")
+            try:
+                self.glue_client.create_crawler(**config)
+                return config["Name"]
+            except Exception as general_error:
+                self.log.error("Failed to create aws glue crawler, error: %s", 
general_error)
+                raise
+
+    def start_crawler(self, crawler_name: str) -> str:
+        """
+        Triggers the AWS Glue crawler
+        :return: Empty dictionary
+        """
+        try:
+            crawler_run = self.glue_client.start_crawler(Name=crawler_name)
+            return crawler_run
+        except Exception as general_error:

Review comment:
       again i'm not sure if it's worth catching only to raise...  are the 
exceptions you get from api not clear enough?
   

##########
File path: airflow/providers/amazon/aws/operators/glue_crawler.py
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AwsGlueCrawlerOperator(BaseOperator):
+    """
+    Creates, updates and triggers an AWS Glue Crawler. AWS Glue Crawler is a 
serverless
+    service that manages a catalog of metadata tables that contain the inferred
+    schema, format and data types of data stores within the AWS cloud.
+
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param poll_interval: Time (in seconds) to wait between two consecutive 
calls to check crawler status
+    :type poll_interval: int
+    :param config = Configurations for the AWS Glue crawler
+    :type config = Optional[dict]
+    """
+
+    template_fields = ()

Review comment:
       if you aren't changing these you can just inherit
   but does it make sense to template the `config` param?

##########
File path: tests/providers/amazon/aws/hooks/test_glue_crawler.py
##########
@@ -0,0 +1,142 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+import unittest
+from unittest import mock
+
+from cached_property import cached_property
+
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+
+try:
+    from moto import mock_iam
+except ImportError:
+    mock_iam = None
+
+mock_crawler_name = 'test-crawler'
+mock_role_name = 'test-role'
+mock_config = {
+    'Name': mock_crawler_name,
+    'Description': 'Test glue crawler from Airflow',
+    'DatabaseName': 'test_db',
+    'Role': mock_role_name,
+    'Targets': {
+        'S3Targets': [
+            {
+                'Path': 's3://test-glue-crawler/foo/',
+                'Exclusions': [
+                    's3://test-glue-crawler/bar/',
+                ],
+                'ConnectionName': 'test-s3-conn',
+            }
+        ],
+        'JdbcTargets': [
+            {
+                'ConnectionName': 'test-jdbc-conn',
+                'Path': 'test_db/test_table>',
+                'Exclusions': [
+                    'string',
+                ],
+            }
+        ],
+        'MongoDBTargets': [
+            {'ConnectionName': 'test-mongo-conn', 'Path': 
'test_db/test_collection', 'ScanAll': True}
+        ],
+        'DynamoDBTargets': [{'Path': 'test_db/test_table', 'scanAll': True, 
'scanRate': 123.0}],
+        'CatalogTargets': [
+            {
+                'DatabaseName': 'test_glue_db',
+                'Tables': [
+                    'test',
+                ],
+            }
+        ],
+    },
+    'Classifiers': ['test-classifier'],
+    'TablePrefix': 'test',
+    'SchemaChangePolicy': {
+        'UpdateBehavior': 'UPDATE_IN_DATABASE',
+        'DeleteBehavior': 'DEPRECATE_IN_DATABASE',
+    },
+    'RecrawlPolicy': {'RecrawlBehavior': 'CRAWL_EVERYTHING'},
+    'LineageConfiguration': 'ENABLE',
+    'Configuration': """
+    {
+        "Version": 1.0,
+        "CrawlerOutput": {
+            "Partitions": { "AddOrUpdateBehavior": "InheritFromTable" }
+        }
+    }
+    """,
+    'SecurityConfiguration': 'test',
+    'Tags': {'test': 'foo'},
+}
+
+
+class TestAwsGlueCrawlerHook(unittest.TestCase):
+    @cached_property
+    def setUp(self):
+        self.hook = AwsGlueCrawlerHook(aws_conn_id="aws_default", 
poll_interval=5)
+
+    @unittest.skipIf(mock_iam is None, 'mock_iam package not present')
+    @mock_iam
+    def test_get_iam_execution_role(self):
+        iam_role = self.hook.get_client_type('iam').create_role(
+            Path="/",
+            RoleName=mock_role_name,
+            AssumeRolePolicyDocument=json.dumps(
+                {
+                    "Version": "2012-10-17",
+                    "Statement": {
+                        "Effect": "Allow",
+                        "Principal": {"Service": "glue.amazonaws.com"},
+                        "Action": "sts:AssumeRole",
+                    },
+                }
+            ),
+        )
+        iam_role = self.hook.get_iam_execution_role(role_name=mock_role_name)
+
+        self.assertIsNotNone(iam_role)
+
+    @mock.patch.object(AwsGlueCrawlerHook, "get_iam_execution_role")
+    @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+    def test_get_or_create_crawler(self, mock_get_conn, 
mock_get_iam_execution_role):
+        mock_get_iam_execution_role.return_value = 
mock.MagicMock(Role={'RoleName': mock_role_name})
+
+        mock_glue_crawler = 
mock_get_conn.return_value.get_crawler()['Crawler']['Name']

Review comment:
       i think what you are trying to do is mock get_crawler to return  some 
value, and then confirm that get_or_create returns this same value.  but here 
you are not correctly mocking a return value -- you are just setting 
`mock_glue_crawler` equal to a mock object `<MagicMock 
name='get_conn().get_crawler()[41 chars]936'>`

##########
File path: airflow/providers/amazon/aws/operators/glue_crawler.py
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AwsGlueCrawlerOperator(BaseOperator):
+    """
+    Creates, updates and triggers an AWS Glue Crawler. AWS Glue Crawler is a 
serverless
+    service that manages a catalog of metadata tables that contain the inferred
+    schema, format and data types of data stores within the AWS cloud.
+
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param poll_interval: Time (in seconds) to wait between two consecutive 
calls to check crawler status
+    :type poll_interval: int
+    :param config = Configurations for the AWS Glue crawler
+    :type config = Optional[dict]
+    """
+
+    template_fields = ()
+    template_ext = ()
+    ui_color = '#ededed'
+
+    @apply_defaults
+    def __init__(
+        self,
+        config,
+        aws_conn_id='aws_default',
+        poll_interval: int = 5,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.aws_conn_id = (aws_conn_id,)

Review comment:
       should be string not tuple

##########
File path: airflow/providers/amazon/aws/sensors/glue_crawler.py
##########
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AwsGlueCrawlerSensor(BaseSensorOperator):
+    """
+    Waits for an AWS Glue crawler to reach any of the statuses below
+    'FAILED', 'CANCELLED', 'SUCCEEDED'
+    :param crawler_name: The AWS Glue crawler unique name
+    :type crawler_name: str
+    """
+
+    template_fields = 'crawler_name'
+
+    @apply_defaults
+    def __init__(self, *, crawler_name: str, aws_conn_id: str = 'aws_default', 
**kwargs):
+        super().__init__(**kwargs)
+        self.crawler_name = crawler_name
+        self.aws_conn_id = aws_conn_id
+        self.success_statuses = ['SUCCEEDED']
+        self.errored_statuses = ['FAILED', 'CANCELLED']

Review comment:
       these probably make more sense as class constants

##########
File path: tests/providers/amazon/aws/sensors/test_glue_crawler.py
##########
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+from airflow import configuration
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+from airflow.providers.amazon.aws.sensors.glue_crawler import 
AwsGlueCrawlerSensor
+
+
+class TestAwsGlueCrawlerSensor(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+
+    @mock.patch.object(AwsGlueCrawlerHook, 'get_conn')
+    @mock.patch.object(AwsGlueCrawlerHook, 'get_crawler_state')
+    def test_poke(self, mock_get_crawler_state, mock_conn):

Review comment:
       you could consider combining these, parameterizing for the various 
statuses and their expected outcome




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to