vincbeck commented on code in PR #22980:
URL: https://github.com/apache/airflow/pull/22980#discussion_r850745387


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -404,3 +404,80 @@ def execute(self, context: 'Context') -> None:
             raise AirflowException(f'JobFlow termination failed: {response}')
         else:
             self.log.info('JobFlow with id %s terminated', self.job_flow_id)
+
+
+class EmrAutoTerminatePolicyOperator(BaseOperator):
+    """
+    An operator to put auto terminate policy on a given cluster/jobflow
+    Note: auto terminate policy is supported with Amazon EMR versions 5.30.0 
and 6.1.0 and later.

Review Comment:
   I would do it that way. I would add 3 parameters:
   - `policy_name` to indicates which policy you want to update. This value 
will tell which API to call through 
[boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.put_auto_termination_policy)
   - `policy_content` which contains the actual content of the policy. e.g.
   
   ```
   {
        AutoTerminationPolicy={
           'IdleTimeout': 123
       }
   }
   ```
   - `instance_group_id`. Optional parameter needed if `policy_name` == 
"auto_scaling"
   
   The code would look like this (pseudo code, please bare with me):
   
   
   ```
   def __init__(
       self,
       policy_name: str,
       policy_content: dict,
       instance_group_id: Optional[str] = None,
       job_flow_id: Optional[str] = None,
       job_flow_name: Optional[str] = None,
       cluster_states: Optional[List[str]] = None,
       aws_conn_id: str = 'aws_default',
       **kwargs
   ):
        ...
   
   
   def execute(self, context: 'Context') -> None:
       ...
   
       if self.policy_name == "auto_termination":
           response = emr.put_auto_termination_policy(
               ClusterId=job_flow_id,
               **self.policy_content,
           )
       elif self.policy_name == "auto_scaling":
           response = emr.put_auto_scaling_policy(
               ClusterId=job_flow_id,
               InstanceGroupId=self.instance_group_id,
               **self.policy_content,
           )
       elif self.policy_name == "managed_scaling":
           response = emr.put_managed_scaling_policy(
               ClusterId=job_flow_id,
               **self.policy_content,
           )
       else:
           raise ...
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to