mrshu commented on issue #6871: [AIRFLOW-6319] Add support for AWS Athena 
workgroups
URL: https://github.com/apache/airflow/pull/6871#issuecomment-584066252
 
 
   Hey everyone,
   
   Thanks a ton for implementing this @bhavika!
   
   When we tried these changes today (after update to 1.10.9), however, it 
turns out they did not really work out for us. There seem to be two reasons:
   
   1. It seems that when you do not create a Workgroup on AWS, the default name 
is called `primary` rather than `default`. This is what the [AWS Docs 
say](https://docs.aws.amazon.com/athena/latest/ug/workgroups-procedure.html):
   
   >  default, if you have not created any workgroups, all queries in your 
account run in the primary workgroup
   
   2. Even after setting the `workgroup` parameter to `primary`, the 
AWSAthenaOperator did not work for us and failed with the following:
   
   ```
   Unknown parameter in input: "Workgroup", must be one of: QueryString, 
ClientRequestToken, QueryExecutionContext, ResultConfiguration, WorkGroup
   ```
   It turns out that the correct parameter to use is actually called 
`WorkGroup` as opposed to `Workgroup` which the code currently uses 
(https://github.com/apache/airflow/pull/6871/files#diff-a5011c8e96e573b3c2a951d1450bd9eaR78).
   
   If you think it may make sense, I'll be happy to send a PR that should fix 
both of these issues. Let me know what you think!
   
   ------------------------------------------------
   
   Just for the sake of completeness (and because I assume a few more people 
may run into this), we now use the following custom `AWSAthenaOperator`:
   
   ```python
   from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
   from airflow.contrib.hooks.aws_athena_hook import AWSAthenaHook
   
   
   class CustomAWSAthenaOperator(AWSAthenaOperator):
       # Monkey patch the workgroup because the default is 'primary' for us, not
       # 'default' as Airflow uses
       # https://github.com/apache/airflow/pull/6871
       def __init__(self, *args, **kwargs):
           super().__init__(*args, **kwargs)
           self.workgroup = 'primary'
   
       def execute(self, context):
           self.hook = self.get_hook()
   
           self.query_execution_context['Database'] = self.database
           self.result_configuration['OutputLocation'] = self.output_location
   
           conn = self.hook.get_conn()
   
           self.res = conn.start_query_execution(QueryString=self.query, # noqa
                                                 
ClientRequestToken=self.client_request_token, # noqa
                                                 
QueryExecutionContext=self.query_execution_context, # noqa
                                                 
ResultConfiguration=self.result_configuration, # noqa
                                                 WorkGroup=self.workgroup) # 
noqa
   
           self.query_execution_id = self.res['QueryExecutionId']
           query_status = self.hook.poll_query_status(self.query_execution_id,
                                                      self.max_tries)
   
           if query_status in AWSAthenaHook.FAILURE_STATES:
               error_message = 
self.hook.get_state_change_reason(self.query_execution_id) # noqa
               raise Exception(
                   'Final state of Athena job is {}, query_execution_id is {}. 
Error: {}' # noqa
                   .format(query_status, self.query_execution_id, 
error_message))
           elif not query_status or query_status in 
AWSAthenaHook.INTERMEDIATE_STATES:  # noqa
               raise Exception(
                   'Final state of Athena job is {}. '
                   'Max tries of poll status exceeded, query_execution_id is 
{}.'
                   .format(query_status, self.query_execution_id))
   
           return self.query_execution_id
   ```
   ------------------------------
   
   > I think for now its enough - once we have a basic framework for running 
the system tests (and some credits provided by Google Cloud/AWS/Azure) we might 
revisit that and add more tests for that.
   
   @potiuk I just wanted to second that this framework would be a great 
addition to the testing pipeline, as issues like the ones we've encountered 
here might get to surface much earlier. Is there any way I can help with that?
   
   Thanks!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to