marshall7m commented on a change in pull request #13072: URL: https://github.com/apache/airflow/pull/13072#discussion_r556723139
########## File path: airflow/providers/amazon/aws/hooks/glue_crawler.py ########## @@ -0,0 +1,169 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from time import sleep + +from cached_property import cached_property + +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook + + +class AwsGlueCrawlerHook(AwsBaseHook): + """ + Interacts with AWS Glue Crawler + :param config = Configurations for the AWS Glue crawler + :type config = dict + """ + + def __init__(self, *args, **kwargs): + kwargs['client_type'] = 'glue' + super().__init__(*args, **kwargs) + + @cached_property + def glue_client(self): + """:return: AWS Glue client""" + return self.get_conn() + + def check_iam_role(self, role_name: str) -> str: + """ + Checks if the input IAM role name is a + valid pre-existing role within the caller's AWS account. + Is needed because the current Boto3 (<=1.16.46) + glue client create_crawler() method misleadingly catches + non-existing role as a role trust policy error. + :param role_name = IAM role name + :type role_name = str + :return: IAM role name + """ + iam_client = self.get_client_type('iam', self.region_name) + + iam_client.get_role(RoleName=role_name) + + def get_or_create_crawler(self, **crawler_kwargs) -> str: + """ + Creates the crawler if the crawler doesn't exists and returns the crawler name + + :param crawler_kwargs = Keyword args that define the configurations used to create/update the crawler + :type crawler_kwargs = any + :return: Name of the crawler + """ + crawler_name = crawler_kwargs['Name'] + try: + glue_response = self.glue_client.get_crawler(Name=crawler_name) + self.log.info("Crawler %s already exists; updating crawler", crawler_name) + self.glue_client.update_crawler(**crawler_kwargs) + return glue_response['Crawler']['Name'] + except self.glue_client.exceptions.EntityNotFoundException: + self.log.info("Creating AWS Glue crawler %s", crawler_name) + try: + glue_response = self.glue_client.create_crawler(**crawler_kwargs) + return glue_response['Crawler']['Name'] + except self.glue_client.exceptions.InvalidInputException as general_error: + self.check_iam_role(crawler_kwargs['Role']) + raise AirflowException(general_error) Review comment: Maybe the operator should be an if/else since the `get_crawler()` function will also have the Glue API's `update_crawler()` function in it and it would be unnecessary to call `update_crawler()` after running `create_crawler()` For example: ``` if not self.hook.has_crawler(**self.config): crawler_name = self.hook.create_crawler(**self.config) else: crawler_name = self.hook.get_crawler(**self.config) ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
