ashb commented on a change in pull request #16571:
URL: https://github.com/apache/airflow/pull/16571#discussion_r678602421



##########
File path: airflow/providers/amazon/aws/operators/eks.py
##########
@@ -0,0 +1,797 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""This module contains Amazon EKS operators."""
+import json
+from datetime import datetime
+from time import sleep
+from typing import Dict, List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.eks import DEFAULT_CONTEXT_NAME, 
DEFAULT_POD_USERNAME, EKSHook
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
+
+CHECK_INTERVAL_SECONDS = 15
+TIMEOUT_SECONDS = 25 * 60
+DEFAULT_COMPUTE_TYPE = 'nodegroup'
+DEFAULT_CONN_ID = "aws_default"
+DEFAULT_NAMESPACE_NAME = 'default'
+DEFAULT_NODEGROUP_NAME_SUFFIX = '-nodegroup'
+DEFAULT_POD_NAME = 'pod'
+
+
+class EKSCreateClusterOperator(BaseOperator):
+    """
+    Creates an Amazon EKS Cluster control plane.
+
+    Optionally, can also create the supporting compute architecture:
+    If argument 'compute' is provided with a value of 'nodegroup', will also 
attempt to create an Amazon
+    EKS Managed Nodegroup for the cluster.  See EKSCreateNodegroupOperator 
documentation for requirements.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateClusterOperator`
+
+    :param cluster_name: The unique name to give to your Amazon EKS Cluster.
+    :type cluster_name: str
+    :param cluster_role_arn: The Amazon Resource Name (ARN) of the IAM role 
that provides permissions for the
+       Kubernetes control plane to make calls to AWS API operations on your 
behalf.
+    :type cluster_role_arn: str
+    :param resources_vpc_config: The VPC configuration used by the cluster 
control plane.
+    :type resources_vpc_config: Dict
+    :param compute: The type of compute architecture to generate along with 
the cluster.
+        Defaults to 'nodegroup' to generate an EKS Managed Nodegroup.
+    :type compute: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+    :param region: Which AWS region the connection should use.
+        If this is None or empty then the default boto3 behaviour is used.
+    :type region: str
+
+    If compute is assigned the value of ``nodegroup``, the following are 
required:
+
+    :param nodegroup_name: The unique name to give your EKS Managed Nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_role_arn: The Amazon Resource Name (ARN) of the IAM role 
to associate
+         with the EKS Managed Nodegroup.
+    :type nodegroup_role_arn: str
+
+    """
+
+    template_fields = (
+        "cluster_name",
+        "cluster_role_arn",
+        "resources_vpc_config",
+        "nodegroup_name",
+        "nodegroup_role_arn",
+        "compute",
+        "aws_conn_id",
+        "region",
+    )
+
+    def __init__(
+        self,
+        cluster_name: str,
+        cluster_role_arn: str,
+        resources_vpc_config: Dict,
+        nodegroup_name: Optional[str] = None,
+        nodegroup_role_arn: Optional[str] = None,
+        compute: Optional[str] = DEFAULT_COMPUTE_TYPE,
+        aws_conn_id: Optional[str] = DEFAULT_CONN_ID,
+        region: Optional[str] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.cluster_name = cluster_name
+        self.cluster_role_arn = cluster_role_arn
+        self.resources_vpc_config = resources_vpc_config
+        self.compute = compute
+        self.aws_conn_id = aws_conn_id
+        self.region = region
+
+        if self.compute == 'nodegroup':
+            self.nodegroup_name = nodegroup_name or self.cluster_name + 
DEFAULT_NODEGROUP_NAME_SUFFIX
+            if nodegroup_role_arn:
+                self.nodegroup_role_arn = nodegroup_role_arn
+            else:
+                message = "Creating an EKS Managed Nodegroup requires 
nodegroup_role_arn to be passed in."
+                self.log.error(message)
+                raise AttributeError(message)
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.aws_conn_id,
+            region_name=self.region,
+        )
+
+        eks_hook.create_cluster(
+            name=self.cluster_name,
+            roleArn=self.cluster_role_arn,
+            resourcesVpcConfig=self.resources_vpc_config,
+        )
+
+        if self.compute is not None:
+            self.log.info("Waiting for EKS Cluster to provision.  This will 
take some time.")
+
+            countdown = TIMEOUT_SECONDS
+            while eks_hook.get_cluster_state(clusterName=self.cluster_name) != 
"ACTIVE":
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    self.log.info(
+                        "Waiting for cluster to start.  Checking again in %d 
seconds", CHECK_INTERVAL_SECONDS
+                    )
+                    sleep(CHECK_INTERVAL_SECONDS)
+                else:
+                    message = (
+                        "Cluster is still inactive after the allocated time 
limit.  "
+                        "Failed cluster will be torn down."
+                    )
+                    self.log.error(message)
+                    # If there is something preventing the cluster for 
activating, tear it down and abort.
+                    eks_hook.delete_cluster(name=self.cluster_name)
+                    raise RuntimeError(message)

Review comment:
       Oh one more thing: is `delete_cluster` sync or async? If it's async (and 
we just wait for the request) then we don't need the log.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to