mrshu commented on issue #6871: [AIRFLOW-6319] Add support for AWS Athena workgroups URL: https://github.com/apache/airflow/pull/6871#issuecomment-584066252 Hey everyone, Thanks a ton for implementing this @bhavika! When we tried these changes today (after update to 1.10.9), however, it turns out they did not really work out for us. There seem to be two reasons: 1. It seems that when you do not create a Workgroup on AWS, the default name is called `primary` rather than `default`. This is what the [AWS Docs say](https://docs.aws.amazon.com/athena/latest/ug/workgroups-procedure.html): > default, if you have not created any workgroups, all queries in your account run in the primary workgroup 2. Even after setting the `workgroup` parameter to `primary`, the AWSAthenaOperator did not work for us and failed with the following: ``` Unknown parameter in input: "Workgroup", must be one of: QueryString, ClientRequestToken, QueryExecutionContext, ResultConfiguration, WorkGroup ``` It turns out that the correct parameter to use is actually called `WorkGroup` as opposed to `Workgroup` which the code currently uses (https://github.com/apache/airflow/pull/6871/files#diff-a5011c8e96e573b3c2a951d1450bd9eaR78). If you think it may make sense, I'll be happy to send a PR that should fix both of these issues. Let me know what you think! ------------------------------------------------ Just for the sake of completeness (and because I assume a few more people may run into this), we now use the following custom `AWSAthenaOperator`: ```python from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator from airflow.contrib.hooks.aws_athena_hook import AWSAthenaHook class CustomAWSAthenaOperator(AWSAthenaOperator): # Monkey patch the workgroup because the default is 'primary' for us, not # 'default' as Airflow uses # https://github.com/apache/airflow/pull/6871 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.workgroup = 'primary' def execute(self, context): self.hook = self.get_hook() self.query_execution_context['Database'] = self.database self.result_configuration['OutputLocation'] = self.output_location conn = self.hook.get_conn() self.res = conn.start_query_execution(QueryString=self.query, # noqa ClientRequestToken=self.client_request_token, # noqa QueryExecutionContext=self.query_execution_context, # noqa ResultConfiguration=self.result_configuration, # noqa WorkGroup=self.workgroup) # noqa self.query_execution_id = self.res['QueryExecutionId'] query_status = self.hook.poll_query_status(self.query_execution_id, self.max_tries) if query_status in AWSAthenaHook.FAILURE_STATES: error_message = self.hook.get_state_change_reason(self.query_execution_id) # noqa raise Exception( 'Final state of Athena job is {}, query_execution_id is {}. Error: {}' # noqa .format(query_status, self.query_execution_id, error_message)) elif not query_status or query_status in AWSAthenaHook.INTERMEDIATE_STATES: # noqa raise Exception( 'Final state of Athena job is {}. ' 'Max tries of poll status exceeded, query_execution_id is {}.' .format(query_status, self.query_execution_id)) return self.query_execution_id ``` ------------------------------ > I think for now its enough - once we have a basic framework for running the system tests (and some credits provided by Google Cloud/AWS/Azure) we might revisit that and add more tests for that. @potiuk I just wanted to second that this framework would be a great addition to the testing pipeline, as issues like the ones we've encountered here might get to surface much earlier. Is there any way I can help with that? Thanks!
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
