ashb commented on a change in pull request #16571:
URL: https://github.com/apache/airflow/pull/16571#discussion_r660433624



##########
File path: airflow/providers/amazon/aws/operators/eks.py
##########
@@ -0,0 +1,737 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""This module contains Amazon EKS operators."""
+import json
+import os
+from datetime import datetime
+from time import sleep
+from typing import Dict, List, Optional
+
+from boto3 import Session
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.eks import DEFAULT_PAGINATION_TOKEN, 
DEFAULT_RESULTS_PER_PAGE, EKSHook
+from airflow.providers.amazon.aws.utils.eks_kube_config import (
+    DEFAULT_CONTEXT_NAME,
+    DEFAULT_KUBE_CONFIG_PATH,
+    DEFAULT_NAMESPACE_NAME,
+    DEFAULT_POD_USERNAME,
+    generate_config_file,
+)
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
+
+CHECK_INTERVAL_SECONDS = 15
+TIMEOUT_SECONDS = 25 * 60
+CONN_ID = "eks"
+REGION = Session().region_name
+DEFAULT_COMPUTE_TYPE = 'nodegroup'
+DEFAULT_NODEGROUP_NAME_SUFFIX = '-nodegroup'
+DEFAULT_POD_NAME = 'pod'
+KUBE_CONFIG_ENV_VAR = 'KUBECONFIG'
+
+
+class EKSCreateClusterOperator(BaseOperator):
+    """
+    Creates an Amazon EKS Cluster control plane.
+
+    Optionally, can also create the supporting compute architecture:
+    If argument 'compute' is provided with a value of 'nodegroup', will also 
attempt to create an Amazon
+    EKS Managed Nodegroup for the cluster.  See EKSCreateNodegroupOperator 
documentation for requirements.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateClusterOperator`
+
+    :param cluster_name: The unique name to give to your Amazon EKS Cluster.
+    :type cluster_name: str
+    :param cluster_role_arn: The Amazon Resource Name (ARN) of the IAM role 
that provides permissions for the
+       Kubernetes control plane to make calls to AWS API operations on your 
behalf.
+    :type cluster_role_arn: str
+    :param resources_vpc_config: The VPC configuration used by the cluster 
control plane.
+    :type resources_vpc_config: Dict
+    :param compute: The type of compute architecture to generate along with 
the cluster.
+        Defaults to 'nodegroup' to generate an EKS Managed Nodegroup.
+    :type compute: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    If 'compute' is 'nodegroup', the following are required:
+
+    :param nodegroup_name: The unique name to give your EKS Managed Nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_role_arn: The Amazon Resource Name (ARN) of the IAM role 
to associate
+         with the EKS Managed Nodegroup.
+    :type nodegroup_role_arn: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        cluster_role_arn: str,
+        resources_vpc_config: Dict,
+        nodegroup_name: Optional[str] = None,
+        nodegroup_role_arn: Optional[str] = None,
+        compute: Optional[str] = DEFAULT_COMPUTE_TYPE,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.clusterRoleArn = cluster_role_arn
+        self.resourcesVpcConfig = resources_vpc_config
+        self.compute = compute
+        self.conn_id = conn_id
+        self.region = region
+
+        if self.compute == 'nodegroup':
+            self.nodegroupName = nodegroup_name or self.clusterName + 
DEFAULT_NODEGROUP_NAME_SUFFIX
+            if nodegroup_role_arn:
+                self.nodegroupRoleArn = nodegroup_role_arn
+            else:
+                message = "Creating an EKS Managed Nodegroup requires 
nodegroup_role_arn to be passed in."
+                self.log.error(message)
+                raise AttributeError(message)
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        eks_hook.create_cluster(
+            name=self.clusterName,
+            roleArn=self.clusterRoleArn,
+            resourcesVpcConfig=self.resourcesVpcConfig,
+        )
+
+        if self.compute is not None:
+            self.log.info("Waiting for EKS Cluster to provision.  This will 
take some time.")
+
+            countdown = TIMEOUT_SECONDS
+            while eks_hook.get_cluster_state(clusterName=self.clusterName) != 
"ACTIVE":
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    self.log.info(
+                        "Waiting for cluster to start.  Checking again in %d 
seconds", CHECK_INTERVAL_SECONDS
+                    )
+                    sleep(CHECK_INTERVAL_SECONDS)
+                else:
+                    message = "Cluster is still inactive after the allocated 
time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        if self.compute == 'nodegroup':
+            eks_hook.create_nodegroup(
+                clusterName=self.clusterName,
+                nodegroupName=self.nodegroupName,
+                subnets=self.resourcesVpcConfig.get('subnetIds'),
+                nodeRole=self.nodegroupRoleArn,
+            )
+
+
+class EKSCreateNodegroupOperator(BaseOperator):
+    """
+    Creates am Amazon EKS Managed Nodegroup for an existing Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to create the 
managed nodegroup in.
+    :type cluster_name: str
+    :param nodegroup_name: The unique name to give your managed nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_subnets:
+        The subnets to use for the Auto Scaling group that is created for the 
managed nodegroup.
+    :type nodegroup_subnets: List[str]
+    :param nodegroup_role_arn:
+        The Amazon Resource Name (ARN) of the IAM role to associate with the 
managed nodegroup.
+    :type nodegroup_role_arn: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_subnets: List[str],
+        nodegroup_role_arn: str,
+        nodegroup_name: Optional[str],
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupSubnets = nodegroup_subnets
+        self.nodegroupRoleArn = nodegroup_role_arn
+        self.nodegroupName = nodegroup_name or cluster_name + 
datetime.now().strftime("%Y%m%d_%H%M%S")
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.create_nodegroup(
+            clusterName=self.clusterName,
+            nodegroupName=self.nodegroupName,
+            subnets=self.nodegroupSubnets,
+            nodeRole=self.nodegroupRoleArn,
+        )
+
+
+class EKSDeleteClusterOperator(BaseOperator):
+    """
+    Deletes the Amazon EKS Cluster control plane and all nodegroups attached 
to it.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteClusterOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to delete.
+    :type cluster_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self, cluster_name: str, conn_id: Optional[str] = CONN_ID, region: 
Optional[str] = REGION, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        nodegroups = 
eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')
+        nodegroup_count = len(nodegroups)
+        if nodegroup_count > 0:
+            self.log.info(
+                "A cluster can not be deleted with attached nodegroups.  
Deleting %d nodegroups.",
+                nodegroup_count,
+            )
+            for group in nodegroups:
+                eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=group)
+
+            # Scaling up the timeout based on the number of nodegroups that 
are being processed.
+            additional_seconds = 5 * 60
+            countdown = TIMEOUT_SECONDS + (nodegroup_count * 
additional_seconds)
+            while 
len(eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')) > 
0:
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    sleep(CHECK_INTERVAL_SECONDS)
+                    self.log.info(
+                        "Waiting for the remaining %s nodegroups to delete.  
Checking again in %d seconds.",
+                        nodegroup_count,
+                        CHECK_INTERVAL_SECONDS,
+                    )
+                else:
+                    message = "Nodegroups are still inactive after the 
allocated time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        self.log.info("No nodegroups remain, deleting cluster.")
+        return eks_hook.delete_cluster(name=self.clusterName)
+
+
+class EKSDeleteNodegroupOperator(BaseOperator):
+    """
+    Deletes an Amazon EKS Nodegroup from an Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster that is associated 
with your nodegroup.
+    :type cluster_name: str
+    :param nodegroup_name: The name of the nodegroup to delete.
+    :type nodegroup_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_name: str,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupName = nodegroup_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=self.nodegroupName)
+
+
+class EKSDescribeAllClustersOperator(BaseOperator):
+    """
+    Describes all Amazon EKS Clusters in your AWS account.
+
+    :param max_results: The maximum number of results to return.
+    :type max_results: int
+    :param next_token: The nextToken value returned from a previous paginated 
execution.

Review comment:
       Generally the hook should handle the pagination/return a paginator (see 
the Athena and Glue hooks) and then the operator just iterates over it.

##########
File path: airflow/providers/amazon/aws/hooks/eks.py
##########
@@ -0,0 +1,346 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""Interact with Amazon EKS, using the boto3 library."""
+
+import json
+from typing import Dict, List, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.utils.json import AirflowJsonEncoder
+
+DEFAULT_RESULTS_PER_PAGE = 100
+DEFAULT_PAGINATION_TOKEN = ''
+
+
+class EksHook(AwsBaseHook):
+    """
+    Interact with Amazon EKS, using the boto3 library.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    conn_type = 'eks'
+    conn_name = 'eks'
+    client_type = 'eks'
+    hook_name = 'EKS'
+
+    def __init__(self, *args, **kwargs) -> None:
+        kwargs["client_type"] = self.client_type
+        super().__init__(*args, **kwargs)
+
+    def create_cluster(self, name: str, roleArn: str, resourcesVpcConfig: 
Dict, **kwargs) -> Dict:
+        """
+        Creates an Amazon EKS control plane.
+
+        :param name: The unique name to give to your Amazon EKS Cluster.
+        :type name: str
+        :param roleArn: The Amazon Resource Name (ARN) of the IAM role that 
provides permissions
+          for the Kubernetes control plane to make calls to AWS API operations 
on your behalf.
+        :type roleArn: str
+        :param resourcesVpcConfig: The VPC configuration used by the cluster 
control plane.
+        :type resourcesVpcConfig: Dict
+
+        :return: Returns descriptive information about the created EKS Cluster.
+        :rtype: Dict
+        """
+        try:
+            eks_client = self.get_conn()
+
+            response = eks_client.create_cluster(
+                name=name, roleArn=roleArn, 
resourcesVpcConfig=resourcesVpcConfig, **kwargs
+            )
+
+            self.log.info("Created cluster with the name %s.", 
response.get('cluster').get('name'))
+            return response
+
+        except ClientError as e:
+            self.log.error(e.response["Error"]["Message"])
+            raise e

Review comment:
       Is this except useful? Given we are re-raising the error I would suggest 
not -- let this be handled by the caller and remove the try/except from all of 
these methods.

##########
File path: airflow/providers/amazon/aws/hooks/eks.py
##########
@@ -0,0 +1,346 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""Interact with Amazon EKS, using the boto3 library."""
+
+import json
+from typing import Dict, List, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.utils.json import AirflowJsonEncoder
+
+DEFAULT_RESULTS_PER_PAGE = 100
+DEFAULT_PAGINATION_TOKEN = ''
+
+
+class EksHook(AwsBaseHook):
+    """
+    Interact with Amazon EKS, using the boto3 library.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    conn_type = 'eks'
+    conn_name = 'eks'
+    client_type = 'eks'
+    hook_name = 'EKS'
+
+    def __init__(self, *args, **kwargs) -> None:
+        kwargs["client_type"] = self.client_type
+        super().__init__(*args, **kwargs)
+
+    def create_cluster(self, name: str, roleArn: str, resourcesVpcConfig: 
Dict, **kwargs) -> Dict:
+        """
+        Creates an Amazon EKS control plane.
+
+        :param name: The unique name to give to your Amazon EKS Cluster.
+        :type name: str
+        :param roleArn: The Amazon Resource Name (ARN) of the IAM role that 
provides permissions
+          for the Kubernetes control plane to make calls to AWS API operations 
on your behalf.
+        :type roleArn: str
+        :param resourcesVpcConfig: The VPC configuration used by the cluster 
control plane.
+        :type resourcesVpcConfig: Dict
+
+        :return: Returns descriptive information about the created EKS Cluster.
+        :rtype: Dict
+        """
+        try:
+            eks_client = self.get_conn()

Review comment:
       ```suggestion
               eks_client = self.conn
   ```
   
   `get_conn()` is qausi-deprecated, and the cached `conn` property is 
preferred.

##########
File path: airflow/providers/amazon/aws/operators/eks.py
##########
@@ -0,0 +1,737 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""This module contains Amazon EKS operators."""
+import json
+import os
+from datetime import datetime
+from time import sleep
+from typing import Dict, List, Optional
+
+from boto3 import Session
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.eks import DEFAULT_PAGINATION_TOKEN, 
DEFAULT_RESULTS_PER_PAGE, EksHook
+from airflow.providers.amazon.aws.utils.eks_kube_config import (
+    DEFAULT_CONTEXT_NAME,
+    DEFAULT_KUBE_CONFIG_PATH,
+    DEFAULT_NAMESPACE_NAME,
+    DEFAULT_POD_USERNAME,
+    generate_config_file,
+)
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
+
+CHECK_INTERVAL_SECONDS = 15
+TIMEOUT_SECONDS = 25 * 60
+CONN_ID = "eks"
+REGION = Session().region_name
+DEFAULT_COMPUTE_TYPE = 'nodegroup'
+DEFAULT_NODEGROUP_NAME_SUFFIX = '-nodegroup'
+DEFAULT_POD_NAME = 'pod'
+KUBE_CONFIG_ENV_VAR = 'KUBECONFIG'
+
+
+class EKSCreateClusterOperator(BaseOperator):
+    """
+    Creates an Amazon EKS Cluster control plane.
+
+    Optionally, can also create the supporting compute architecture:
+    If argument 'compute' is provided with a value of 'nodegroup', will also 
attempt to create an Amazon
+    EKS Managed Nodegroup for the cluster.  See EKSCreateNodegroupOperator 
documentation for requirements.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateClusterOperator`
+
+    :param cluster_name: The unique name to give to your Amazon EKS Cluster.
+    :type cluster_name: str
+    :param cluster_role_arn: The Amazon Resource Name (ARN) of the IAM role 
that provides permissions for the
+       Kubernetes control plane to make calls to AWS API operations on your 
behalf.
+    :type cluster_role_arn: str
+    :param resources_vpc_config: The VPC configuration used by the cluster 
control plane.
+    :type resources_vpc_config: Dict
+    :param compute: The type of compute architecture to generate along with 
the cluster.
+        Defaults to 'nodegroup' to generate an EKS Managed Nodegroup.
+    :type compute: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    If 'compute' is 'nodegroup', the following are required:

Review comment:
       I don't think this will render correctly -- you'll have to express this 
in prose instead.

##########
File path: airflow/providers/amazon/aws/operators/eks.py
##########
@@ -0,0 +1,737 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""This module contains Amazon EKS operators."""
+import json
+import os
+from datetime import datetime
+from time import sleep
+from typing import Dict, List, Optional
+
+from boto3 import Session
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.eks import DEFAULT_PAGINATION_TOKEN, 
DEFAULT_RESULTS_PER_PAGE, EksHook
+from airflow.providers.amazon.aws.utils.eks_kube_config import (
+    DEFAULT_CONTEXT_NAME,
+    DEFAULT_KUBE_CONFIG_PATH,
+    DEFAULT_NAMESPACE_NAME,
+    DEFAULT_POD_USERNAME,
+    generate_config_file,
+)
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
+
+CHECK_INTERVAL_SECONDS = 15
+TIMEOUT_SECONDS = 25 * 60
+CONN_ID = "eks"
+REGION = Session().region_name
+DEFAULT_COMPUTE_TYPE = 'nodegroup'
+DEFAULT_NODEGROUP_NAME_SUFFIX = '-nodegroup'
+DEFAULT_POD_NAME = 'pod'
+KUBE_CONFIG_ENV_VAR = 'KUBECONFIG'
+
+
+class EKSCreateClusterOperator(BaseOperator):
+    """
+    Creates an Amazon EKS Cluster control plane.
+
+    Optionally, can also create the supporting compute architecture:
+    If argument 'compute' is provided with a value of 'nodegroup', will also 
attempt to create an Amazon
+    EKS Managed Nodegroup for the cluster.  See EKSCreateNodegroupOperator 
documentation for requirements.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateClusterOperator`
+
+    :param cluster_name: The unique name to give to your Amazon EKS Cluster.
+    :type cluster_name: str
+    :param cluster_role_arn: The Amazon Resource Name (ARN) of the IAM role 
that provides permissions for the
+       Kubernetes control plane to make calls to AWS API operations on your 
behalf.
+    :type cluster_role_arn: str
+    :param resources_vpc_config: The VPC configuration used by the cluster 
control plane.
+    :type resources_vpc_config: Dict
+    :param compute: The type of compute architecture to generate along with 
the cluster.
+        Defaults to 'nodegroup' to generate an EKS Managed Nodegroup.
+    :type compute: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    If 'compute' is 'nodegroup', the following are required:
+
+    :param nodegroup_name: The unique name to give your EKS Managed Nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_role_arn: The Amazon Resource Name (ARN) of the IAM role 
to associate
+         with the EKS Managed Nodegroup.
+    :type nodegroup_role_arn: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        cluster_role_arn: str,
+        resources_vpc_config: Dict,
+        nodegroup_name: Optional[str] = None,
+        nodegroup_role_arn: Optional[str] = None,
+        compute: Optional[str] = DEFAULT_COMPUTE_TYPE,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.clusterRoleArn = cluster_role_arn
+        self.resourcesVpcConfig = resources_vpc_config
+        self.compute = compute
+        self.conn_id = conn_id
+        self.region = region
+
+        if self.compute == 'nodegroup':
+            self.nodegroupName = nodegroup_name or self.clusterName + 
DEFAULT_NODEGROUP_NAME_SUFFIX
+            if nodegroup_role_arn:
+                self.nodegroupRoleArn = nodegroup_role_arn
+            else:
+                message = "Creating an EKS Managed Nodegroup requires 
nodegroup_role_arn to be passed in."
+                self.log.error(message)
+                raise AttributeError(message)
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        eks_hook.create_cluster(
+            name=self.clusterName,
+            roleArn=self.clusterRoleArn,
+            resourcesVpcConfig=self.resourcesVpcConfig,
+        )
+
+        if self.compute is not None:
+            self.log.info("Waiting for EKS Cluster to provision.  This will 
take some time.")
+
+            countdown = TIMEOUT_SECONDS
+            while eks_hook.get_cluster_state(clusterName=self.clusterName) != 
"ACTIVE":
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    self.log.info(
+                        "Waiting for cluster to start.  Checking again in %d 
seconds", CHECK_INTERVAL_SECONDS
+                    )
+                    sleep(CHECK_INTERVAL_SECONDS)
+                else:
+                    message = "Cluster is still inactive after the allocated 
time limit.  Aborting."

Review comment:
       Should we tear it down in this case?

##########
File path: airflow/providers/amazon/aws/operators/eks.py
##########
@@ -0,0 +1,737 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""This module contains Amazon EKS operators."""
+import json
+import os
+from datetime import datetime
+from time import sleep
+from typing import Dict, List, Optional
+
+from boto3 import Session
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.eks import DEFAULT_PAGINATION_TOKEN, 
DEFAULT_RESULTS_PER_PAGE, EksHook
+from airflow.providers.amazon.aws.utils.eks_kube_config import (
+    DEFAULT_CONTEXT_NAME,
+    DEFAULT_KUBE_CONFIG_PATH,
+    DEFAULT_NAMESPACE_NAME,
+    DEFAULT_POD_USERNAME,
+    generate_config_file,
+)
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
+
+CHECK_INTERVAL_SECONDS = 15
+TIMEOUT_SECONDS = 25 * 60
+CONN_ID = "eks"
+REGION = Session().region_name
+DEFAULT_COMPUTE_TYPE = 'nodegroup'
+DEFAULT_NODEGROUP_NAME_SUFFIX = '-nodegroup'
+DEFAULT_POD_NAME = 'pod'
+KUBE_CONFIG_ENV_VAR = 'KUBECONFIG'
+
+
+class EKSCreateClusterOperator(BaseOperator):
+    """
+    Creates an Amazon EKS Cluster control plane.
+
+    Optionally, can also create the supporting compute architecture:
+    If argument 'compute' is provided with a value of 'nodegroup', will also 
attempt to create an Amazon
+    EKS Managed Nodegroup for the cluster.  See EKSCreateNodegroupOperator 
documentation for requirements.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateClusterOperator`
+
+    :param cluster_name: The unique name to give to your Amazon EKS Cluster.
+    :type cluster_name: str
+    :param cluster_role_arn: The Amazon Resource Name (ARN) of the IAM role 
that provides permissions for the
+       Kubernetes control plane to make calls to AWS API operations on your 
behalf.
+    :type cluster_role_arn: str
+    :param resources_vpc_config: The VPC configuration used by the cluster 
control plane.
+    :type resources_vpc_config: Dict
+    :param compute: The type of compute architecture to generate along with 
the cluster.
+        Defaults to 'nodegroup' to generate an EKS Managed Nodegroup.
+    :type compute: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    If 'compute' is 'nodegroup', the following are required:
+
+    :param nodegroup_name: The unique name to give your EKS Managed Nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_role_arn: The Amazon Resource Name (ARN) of the IAM role 
to associate
+         with the EKS Managed Nodegroup.
+    :type nodegroup_role_arn: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        cluster_role_arn: str,
+        resources_vpc_config: Dict,
+        nodegroup_name: Optional[str] = None,
+        nodegroup_role_arn: Optional[str] = None,
+        compute: Optional[str] = DEFAULT_COMPUTE_TYPE,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.clusterRoleArn = cluster_role_arn
+        self.resourcesVpcConfig = resources_vpc_config
+        self.compute = compute
+        self.conn_id = conn_id
+        self.region = region
+
+        if self.compute == 'nodegroup':
+            self.nodegroupName = nodegroup_name or self.clusterName + 
DEFAULT_NODEGROUP_NAME_SUFFIX
+            if nodegroup_role_arn:
+                self.nodegroupRoleArn = nodegroup_role_arn
+            else:
+                message = "Creating an EKS Managed Nodegroup requires 
nodegroup_role_arn to be passed in."
+                self.log.error(message)
+                raise AttributeError(message)
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        eks_hook.create_cluster(
+            name=self.clusterName,
+            roleArn=self.clusterRoleArn,
+            resourcesVpcConfig=self.resourcesVpcConfig,
+        )
+
+        if self.compute is not None:
+            self.log.info("Waiting for EKS Cluster to provision.  This will 
take some time.")
+
+            countdown = TIMEOUT_SECONDS
+            while eks_hook.get_cluster_state(clusterName=self.clusterName) != 
"ACTIVE":
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    self.log.info(
+                        "Waiting for cluster to start.  Checking again in %d 
seconds", CHECK_INTERVAL_SECONDS
+                    )
+                    sleep(CHECK_INTERVAL_SECONDS)
+                else:
+                    message = "Cluster is still inactive after the allocated 
time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        if self.compute == 'nodegroup':
+            eks_hook.create_nodegroup(
+                clusterName=self.clusterName,
+                nodegroupName=self.nodegroupName,
+                subnets=self.resourcesVpcConfig.get('subnetIds'),
+                nodeRole=self.nodegroupRoleArn,
+            )
+
+
+class EKSCreateNodegroupOperator(BaseOperator):
+    """
+    Creates am Amazon EKS Managed Nodegroup for an existing Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to create the 
managed nodegroup in.
+    :type cluster_name: str
+    :param nodegroup_name: The unique name to give your managed nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_subnets:
+        The subnets to use for the Auto Scaling group that is created for the 
managed nodegroup.
+    :type nodegroup_subnets: List[str]
+    :param nodegroup_role_arn:
+        The Amazon Resource Name (ARN) of the IAM role to associate with the 
managed nodegroup.
+    :type nodegroup_role_arn: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_subnets: List[str],
+        nodegroup_role_arn: str,
+        nodegroup_name: Optional[str],
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupSubnets = nodegroup_subnets
+        self.nodegroupRoleArn = nodegroup_role_arn
+        self.nodegroupName = nodegroup_name or cluster_name + 
datetime.now().strftime("%Y%m%d_%H%M%S")
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.create_nodegroup(
+            clusterName=self.clusterName,
+            nodegroupName=self.nodegroupName,
+            subnets=self.nodegroupSubnets,
+            nodeRole=self.nodegroupRoleArn,
+        )
+
+
+class EKSDeleteClusterOperator(BaseOperator):
+    """
+    Deletes the Amazon EKS Cluster control plane and all nodegroups attached 
to it.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteClusterOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to delete.
+    :type cluster_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self, cluster_name: str, conn_id: Optional[str] = CONN_ID, region: 
Optional[str] = REGION, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        nodegroups = 
eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')
+        nodegroup_count = len(nodegroups)
+        if nodegroup_count > 0:
+            self.log.info(
+                "A cluster can not be deleted with attached nodegroups.  
Deleting %d nodegroups.",
+                nodegroup_count,
+            )
+            for group in nodegroups:
+                eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=group)
+
+            # Scaling up the timeout based on the number of nodegroups that 
are being processed.
+            additional_seconds = 5 * 60
+            countdown = TIMEOUT_SECONDS + (nodegroup_count * 
additional_seconds)
+            while 
len(eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')) > 
0:

Review comment:
       ```suggestion
               while 
eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups'):
   ```
   
   No need to check the length, just see if it's "truthy"

##########
File path: airflow/providers/amazon/aws/operators/eks.py
##########
@@ -0,0 +1,737 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""This module contains Amazon EKS operators."""
+import json
+import os
+from datetime import datetime
+from time import sleep
+from typing import Dict, List, Optional
+
+from boto3 import Session
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.eks import DEFAULT_PAGINATION_TOKEN, 
DEFAULT_RESULTS_PER_PAGE, EksHook
+from airflow.providers.amazon.aws.utils.eks_kube_config import (
+    DEFAULT_CONTEXT_NAME,
+    DEFAULT_KUBE_CONFIG_PATH,
+    DEFAULT_NAMESPACE_NAME,
+    DEFAULT_POD_USERNAME,
+    generate_config_file,
+)
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
+
+CHECK_INTERVAL_SECONDS = 15
+TIMEOUT_SECONDS = 25 * 60
+CONN_ID = "eks"
+REGION = Session().region_name
+DEFAULT_COMPUTE_TYPE = 'nodegroup'
+DEFAULT_NODEGROUP_NAME_SUFFIX = '-nodegroup'
+DEFAULT_POD_NAME = 'pod'
+KUBE_CONFIG_ENV_VAR = 'KUBECONFIG'
+
+
+class EKSCreateClusterOperator(BaseOperator):
+    """
+    Creates an Amazon EKS Cluster control plane.
+
+    Optionally, can also create the supporting compute architecture:
+    If argument 'compute' is provided with a value of 'nodegroup', will also 
attempt to create an Amazon
+    EKS Managed Nodegroup for the cluster.  See EKSCreateNodegroupOperator 
documentation for requirements.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateClusterOperator`
+
+    :param cluster_name: The unique name to give to your Amazon EKS Cluster.
+    :type cluster_name: str
+    :param cluster_role_arn: The Amazon Resource Name (ARN) of the IAM role 
that provides permissions for the
+       Kubernetes control plane to make calls to AWS API operations on your 
behalf.
+    :type cluster_role_arn: str
+    :param resources_vpc_config: The VPC configuration used by the cluster 
control plane.
+    :type resources_vpc_config: Dict
+    :param compute: The type of compute architecture to generate along with 
the cluster.
+        Defaults to 'nodegroup' to generate an EKS Managed Nodegroup.
+    :type compute: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    If 'compute' is 'nodegroup', the following are required:
+
+    :param nodegroup_name: The unique name to give your EKS Managed Nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_role_arn: The Amazon Resource Name (ARN) of the IAM role 
to associate
+         with the EKS Managed Nodegroup.
+    :type nodegroup_role_arn: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        cluster_role_arn: str,
+        resources_vpc_config: Dict,
+        nodegroup_name: Optional[str] = None,
+        nodegroup_role_arn: Optional[str] = None,
+        compute: Optional[str] = DEFAULT_COMPUTE_TYPE,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.clusterRoleArn = cluster_role_arn
+        self.resourcesVpcConfig = resources_vpc_config
+        self.compute = compute
+        self.conn_id = conn_id
+        self.region = region
+
+        if self.compute == 'nodegroup':
+            self.nodegroupName = nodegroup_name or self.clusterName + 
DEFAULT_NODEGROUP_NAME_SUFFIX
+            if nodegroup_role_arn:
+                self.nodegroupRoleArn = nodegroup_role_arn
+            else:
+                message = "Creating an EKS Managed Nodegroup requires 
nodegroup_role_arn to be passed in."
+                self.log.error(message)
+                raise AttributeError(message)
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        eks_hook.create_cluster(
+            name=self.clusterName,
+            roleArn=self.clusterRoleArn,
+            resourcesVpcConfig=self.resourcesVpcConfig,
+        )
+
+        if self.compute is not None:
+            self.log.info("Waiting for EKS Cluster to provision.  This will 
take some time.")
+
+            countdown = TIMEOUT_SECONDS
+            while eks_hook.get_cluster_state(clusterName=self.clusterName) != 
"ACTIVE":
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    self.log.info(
+                        "Waiting for cluster to start.  Checking again in %d 
seconds", CHECK_INTERVAL_SECONDS
+                    )
+                    sleep(CHECK_INTERVAL_SECONDS)
+                else:
+                    message = "Cluster is still inactive after the allocated 
time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        if self.compute == 'nodegroup':
+            eks_hook.create_nodegroup(
+                clusterName=self.clusterName,
+                nodegroupName=self.nodegroupName,
+                subnets=self.resourcesVpcConfig.get('subnetIds'),
+                nodeRole=self.nodegroupRoleArn,
+            )
+
+
+class EKSCreateNodegroupOperator(BaseOperator):
+    """
+    Creates am Amazon EKS Managed Nodegroup for an existing Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to create the 
managed nodegroup in.
+    :type cluster_name: str
+    :param nodegroup_name: The unique name to give your managed nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_subnets:
+        The subnets to use for the Auto Scaling group that is created for the 
managed nodegroup.
+    :type nodegroup_subnets: List[str]
+    :param nodegroup_role_arn:
+        The Amazon Resource Name (ARN) of the IAM role to associate with the 
managed nodegroup.
+    :type nodegroup_role_arn: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_subnets: List[str],
+        nodegroup_role_arn: str,
+        nodegroup_name: Optional[str],
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupSubnets = nodegroup_subnets
+        self.nodegroupRoleArn = nodegroup_role_arn
+        self.nodegroupName = nodegroup_name or cluster_name + 
datetime.now().strftime("%Y%m%d_%H%M%S")
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.create_nodegroup(
+            clusterName=self.clusterName,
+            nodegroupName=self.nodegroupName,
+            subnets=self.nodegroupSubnets,
+            nodeRole=self.nodegroupRoleArn,
+        )
+
+
+class EKSDeleteClusterOperator(BaseOperator):
+    """
+    Deletes the Amazon EKS Cluster control plane and all nodegroups attached 
to it.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteClusterOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to delete.
+    :type cluster_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self, cluster_name: str, conn_id: Optional[str] = CONN_ID, region: 
Optional[str] = REGION, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        nodegroups = 
eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')
+        nodegroup_count = len(nodegroups)
+        if nodegroup_count > 0:
+            self.log.info(
+                "A cluster can not be deleted with attached nodegroups.  
Deleting %d nodegroups.",
+                nodegroup_count,
+            )
+            for group in nodegroups:
+                eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=group)
+
+            # Scaling up the timeout based on the number of nodegroups that 
are being processed.
+            additional_seconds = 5 * 60
+            countdown = TIMEOUT_SECONDS + (nodegroup_count * 
additional_seconds)
+            while 
len(eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')) > 
0:
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    sleep(CHECK_INTERVAL_SECONDS)
+                    self.log.info(
+                        "Waiting for the remaining %s nodegroups to delete.  
Checking again in %d seconds.",
+                        nodegroup_count,
+                        CHECK_INTERVAL_SECONDS,
+                    )
+                else:
+                    message = "Nodegroups are still inactive after the 
allocated time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        self.log.info("No nodegroups remain, deleting cluster.")
+        return eks_hook.delete_cluster(name=self.clusterName)
+
+
+class EKSDeleteNodegroupOperator(BaseOperator):
+    """
+    Deletes an Amazon EKS Nodegroup from an Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster that is associated 
with your nodegroup.
+    :type cluster_name: str
+    :param nodegroup_name: The name of the nodegroup to delete.
+    :type nodegroup_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_name: str,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupName = nodegroup_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=self.nodegroupName)
+
+
+class EKSDescribeAllClustersOperator(BaseOperator):

Review comment:
       Can you think of a case where having this as an _operator_ to use in a 
DAG is actually useful?
   
   My gut says that _this_ one doesn't need to be an operator.

##########
File path: airflow/providers/amazon/aws/operators/eks.py
##########
@@ -0,0 +1,737 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""This module contains Amazon EKS operators."""
+import json
+import os
+from datetime import datetime
+from time import sleep
+from typing import Dict, List, Optional
+
+from boto3 import Session
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.eks import DEFAULT_PAGINATION_TOKEN, 
DEFAULT_RESULTS_PER_PAGE, EksHook
+from airflow.providers.amazon.aws.utils.eks_kube_config import (
+    DEFAULT_CONTEXT_NAME,
+    DEFAULT_KUBE_CONFIG_PATH,
+    DEFAULT_NAMESPACE_NAME,
+    DEFAULT_POD_USERNAME,
+    generate_config_file,
+)
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
+
+CHECK_INTERVAL_SECONDS = 15
+TIMEOUT_SECONDS = 25 * 60
+CONN_ID = "eks"
+REGION = Session().region_name
+DEFAULT_COMPUTE_TYPE = 'nodegroup'
+DEFAULT_NODEGROUP_NAME_SUFFIX = '-nodegroup'
+DEFAULT_POD_NAME = 'pod'
+KUBE_CONFIG_ENV_VAR = 'KUBECONFIG'
+
+
+class EKSCreateClusterOperator(BaseOperator):
+    """
+    Creates an Amazon EKS Cluster control plane.
+
+    Optionally, can also create the supporting compute architecture:
+    If argument 'compute' is provided with a value of 'nodegroup', will also 
attempt to create an Amazon
+    EKS Managed Nodegroup for the cluster.  See EKSCreateNodegroupOperator 
documentation for requirements.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateClusterOperator`
+
+    :param cluster_name: The unique name to give to your Amazon EKS Cluster.
+    :type cluster_name: str
+    :param cluster_role_arn: The Amazon Resource Name (ARN) of the IAM role 
that provides permissions for the
+       Kubernetes control plane to make calls to AWS API operations on your 
behalf.
+    :type cluster_role_arn: str
+    :param resources_vpc_config: The VPC configuration used by the cluster 
control plane.
+    :type resources_vpc_config: Dict
+    :param compute: The type of compute architecture to generate along with 
the cluster.
+        Defaults to 'nodegroup' to generate an EKS Managed Nodegroup.
+    :type compute: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    If 'compute' is 'nodegroup', the following are required:
+
+    :param nodegroup_name: The unique name to give your EKS Managed Nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_role_arn: The Amazon Resource Name (ARN) of the IAM role 
to associate
+         with the EKS Managed Nodegroup.
+    :type nodegroup_role_arn: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        cluster_role_arn: str,
+        resources_vpc_config: Dict,
+        nodegroup_name: Optional[str] = None,
+        nodegroup_role_arn: Optional[str] = None,
+        compute: Optional[str] = DEFAULT_COMPUTE_TYPE,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.clusterRoleArn = cluster_role_arn
+        self.resourcesVpcConfig = resources_vpc_config
+        self.compute = compute
+        self.conn_id = conn_id
+        self.region = region
+
+        if self.compute == 'nodegroup':
+            self.nodegroupName = nodegroup_name or self.clusterName + 
DEFAULT_NODEGROUP_NAME_SUFFIX
+            if nodegroup_role_arn:
+                self.nodegroupRoleArn = nodegroup_role_arn
+            else:
+                message = "Creating an EKS Managed Nodegroup requires 
nodegroup_role_arn to be passed in."
+                self.log.error(message)
+                raise AttributeError(message)
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        eks_hook.create_cluster(
+            name=self.clusterName,
+            roleArn=self.clusterRoleArn,
+            resourcesVpcConfig=self.resourcesVpcConfig,
+        )
+
+        if self.compute is not None:
+            self.log.info("Waiting for EKS Cluster to provision.  This will 
take some time.")
+
+            countdown = TIMEOUT_SECONDS
+            while eks_hook.get_cluster_state(clusterName=self.clusterName) != 
"ACTIVE":
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    self.log.info(
+                        "Waiting for cluster to start.  Checking again in %d 
seconds", CHECK_INTERVAL_SECONDS
+                    )
+                    sleep(CHECK_INTERVAL_SECONDS)
+                else:
+                    message = "Cluster is still inactive after the allocated 
time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        if self.compute == 'nodegroup':
+            eks_hook.create_nodegroup(
+                clusterName=self.clusterName,
+                nodegroupName=self.nodegroupName,
+                subnets=self.resourcesVpcConfig.get('subnetIds'),
+                nodeRole=self.nodegroupRoleArn,
+            )
+
+
+class EKSCreateNodegroupOperator(BaseOperator):
+    """
+    Creates am Amazon EKS Managed Nodegroup for an existing Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to create the 
managed nodegroup in.
+    :type cluster_name: str
+    :param nodegroup_name: The unique name to give your managed nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_subnets:
+        The subnets to use for the Auto Scaling group that is created for the 
managed nodegroup.
+    :type nodegroup_subnets: List[str]
+    :param nodegroup_role_arn:
+        The Amazon Resource Name (ARN) of the IAM role to associate with the 
managed nodegroup.
+    :type nodegroup_role_arn: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_subnets: List[str],
+        nodegroup_role_arn: str,
+        nodegroup_name: Optional[str],
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupSubnets = nodegroup_subnets
+        self.nodegroupRoleArn = nodegroup_role_arn
+        self.nodegroupName = nodegroup_name or cluster_name + 
datetime.now().strftime("%Y%m%d_%H%M%S")
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.create_nodegroup(
+            clusterName=self.clusterName,
+            nodegroupName=self.nodegroupName,
+            subnets=self.nodegroupSubnets,
+            nodeRole=self.nodegroupRoleArn,
+        )
+
+
+class EKSDeleteClusterOperator(BaseOperator):
+    """
+    Deletes the Amazon EKS Cluster control plane and all nodegroups attached 
to it.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteClusterOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to delete.
+    :type cluster_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self, cluster_name: str, conn_id: Optional[str] = CONN_ID, region: 
Optional[str] = REGION, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        nodegroups = 
eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')
+        nodegroup_count = len(nodegroups)
+        if nodegroup_count > 0:
+            self.log.info(
+                "A cluster can not be deleted with attached nodegroups.  
Deleting %d nodegroups.",
+                nodegroup_count,
+            )
+            for group in nodegroups:
+                eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=group)
+
+            # Scaling up the timeout based on the number of nodegroups that 
are being processed.
+            additional_seconds = 5 * 60
+            countdown = TIMEOUT_SECONDS + (nodegroup_count * 
additional_seconds)
+            while 
len(eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')) > 
0:
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    sleep(CHECK_INTERVAL_SECONDS)
+                    self.log.info(
+                        "Waiting for the remaining %s nodegroups to delete.  
Checking again in %d seconds.",
+                        nodegroup_count,
+                        CHECK_INTERVAL_SECONDS,
+                    )
+                else:
+                    message = "Nodegroups are still inactive after the 
allocated time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        self.log.info("No nodegroups remain, deleting cluster.")
+        return eks_hook.delete_cluster(name=self.clusterName)
+
+
+class EKSDeleteNodegroupOperator(BaseOperator):
+    """
+    Deletes an Amazon EKS Nodegroup from an Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster that is associated 
with your nodegroup.
+    :type cluster_name: str
+    :param nodegroup_name: The name of the nodegroup to delete.
+    :type nodegroup_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_name: str,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupName = nodegroup_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=self.nodegroupName)
+
+
+class EKSDescribeAllClustersOperator(BaseOperator):

Review comment:
       Can you think of a case where having this as an _operator_ to use in a 
DAG is actually useful?
   
   My gut says that all these Describe* ones don't need to be operators.

##########
File path: airflow/providers/amazon/aws/operators/eks.py
##########
@@ -0,0 +1,737 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""This module contains Amazon EKS operators."""
+import json
+import os
+from datetime import datetime
+from time import sleep
+from typing import Dict, List, Optional
+
+from boto3 import Session
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.eks import DEFAULT_PAGINATION_TOKEN, 
DEFAULT_RESULTS_PER_PAGE, EksHook
+from airflow.providers.amazon.aws.utils.eks_kube_config import (
+    DEFAULT_CONTEXT_NAME,
+    DEFAULT_KUBE_CONFIG_PATH,
+    DEFAULT_NAMESPACE_NAME,
+    DEFAULT_POD_USERNAME,
+    generate_config_file,
+)
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
+
+CHECK_INTERVAL_SECONDS = 15
+TIMEOUT_SECONDS = 25 * 60
+CONN_ID = "eks"
+REGION = Session().region_name
+DEFAULT_COMPUTE_TYPE = 'nodegroup'
+DEFAULT_NODEGROUP_NAME_SUFFIX = '-nodegroup'
+DEFAULT_POD_NAME = 'pod'
+KUBE_CONFIG_ENV_VAR = 'KUBECONFIG'
+
+
+class EKSCreateClusterOperator(BaseOperator):
+    """
+    Creates an Amazon EKS Cluster control plane.
+
+    Optionally, can also create the supporting compute architecture:
+    If argument 'compute' is provided with a value of 'nodegroup', will also 
attempt to create an Amazon
+    EKS Managed Nodegroup for the cluster.  See EKSCreateNodegroupOperator 
documentation for requirements.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateClusterOperator`
+
+    :param cluster_name: The unique name to give to your Amazon EKS Cluster.
+    :type cluster_name: str
+    :param cluster_role_arn: The Amazon Resource Name (ARN) of the IAM role 
that provides permissions for the
+       Kubernetes control plane to make calls to AWS API operations on your 
behalf.
+    :type cluster_role_arn: str
+    :param resources_vpc_config: The VPC configuration used by the cluster 
control plane.
+    :type resources_vpc_config: Dict
+    :param compute: The type of compute architecture to generate along with 
the cluster.
+        Defaults to 'nodegroup' to generate an EKS Managed Nodegroup.
+    :type compute: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    If 'compute' is 'nodegroup', the following are required:
+
+    :param nodegroup_name: The unique name to give your EKS Managed Nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_role_arn: The Amazon Resource Name (ARN) of the IAM role 
to associate
+         with the EKS Managed Nodegroup.
+    :type nodegroup_role_arn: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        cluster_role_arn: str,
+        resources_vpc_config: Dict,
+        nodegroup_name: Optional[str] = None,
+        nodegroup_role_arn: Optional[str] = None,
+        compute: Optional[str] = DEFAULT_COMPUTE_TYPE,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.clusterRoleArn = cluster_role_arn
+        self.resourcesVpcConfig = resources_vpc_config
+        self.compute = compute
+        self.conn_id = conn_id
+        self.region = region
+
+        if self.compute == 'nodegroup':
+            self.nodegroupName = nodegroup_name or self.clusterName + 
DEFAULT_NODEGROUP_NAME_SUFFIX
+            if nodegroup_role_arn:
+                self.nodegroupRoleArn = nodegroup_role_arn
+            else:
+                message = "Creating an EKS Managed Nodegroup requires 
nodegroup_role_arn to be passed in."
+                self.log.error(message)
+                raise AttributeError(message)
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        eks_hook.create_cluster(
+            name=self.clusterName,
+            roleArn=self.clusterRoleArn,
+            resourcesVpcConfig=self.resourcesVpcConfig,
+        )
+
+        if self.compute is not None:
+            self.log.info("Waiting for EKS Cluster to provision.  This will 
take some time.")
+
+            countdown = TIMEOUT_SECONDS
+            while eks_hook.get_cluster_state(clusterName=self.clusterName) != 
"ACTIVE":
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    self.log.info(
+                        "Waiting for cluster to start.  Checking again in %d 
seconds", CHECK_INTERVAL_SECONDS
+                    )
+                    sleep(CHECK_INTERVAL_SECONDS)
+                else:
+                    message = "Cluster is still inactive after the allocated 
time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        if self.compute == 'nodegroup':
+            eks_hook.create_nodegroup(
+                clusterName=self.clusterName,
+                nodegroupName=self.nodegroupName,
+                subnets=self.resourcesVpcConfig.get('subnetIds'),
+                nodeRole=self.nodegroupRoleArn,
+            )
+
+
+class EKSCreateNodegroupOperator(BaseOperator):
+    """
+    Creates am Amazon EKS Managed Nodegroup for an existing Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to create the 
managed nodegroup in.
+    :type cluster_name: str
+    :param nodegroup_name: The unique name to give your managed nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_subnets:
+        The subnets to use for the Auto Scaling group that is created for the 
managed nodegroup.
+    :type nodegroup_subnets: List[str]
+    :param nodegroup_role_arn:
+        The Amazon Resource Name (ARN) of the IAM role to associate with the 
managed nodegroup.
+    :type nodegroup_role_arn: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_subnets: List[str],
+        nodegroup_role_arn: str,
+        nodegroup_name: Optional[str],
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupSubnets = nodegroup_subnets
+        self.nodegroupRoleArn = nodegroup_role_arn
+        self.nodegroupName = nodegroup_name or cluster_name + 
datetime.now().strftime("%Y%m%d_%H%M%S")
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.create_nodegroup(
+            clusterName=self.clusterName,
+            nodegroupName=self.nodegroupName,
+            subnets=self.nodegroupSubnets,
+            nodeRole=self.nodegroupRoleArn,
+        )
+
+
+class EKSDeleteClusterOperator(BaseOperator):
+    """
+    Deletes the Amazon EKS Cluster control plane and all nodegroups attached 
to it.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteClusterOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to delete.
+    :type cluster_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self, cluster_name: str, conn_id: Optional[str] = CONN_ID, region: 
Optional[str] = REGION, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        nodegroups = 
eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')
+        nodegroup_count = len(nodegroups)
+        if nodegroup_count > 0:
+            self.log.info(
+                "A cluster can not be deleted with attached nodegroups.  
Deleting %d nodegroups.",
+                nodegroup_count,
+            )
+            for group in nodegroups:
+                eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=group)
+
+            # Scaling up the timeout based on the number of nodegroups that 
are being processed.
+            additional_seconds = 5 * 60
+            countdown = TIMEOUT_SECONDS + (nodegroup_count * 
additional_seconds)
+            while 
len(eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')) > 
0:
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    sleep(CHECK_INTERVAL_SECONDS)
+                    self.log.info(
+                        "Waiting for the remaining %s nodegroups to delete.  
Checking again in %d seconds.",
+                        nodegroup_count,
+                        CHECK_INTERVAL_SECONDS,
+                    )
+                else:
+                    message = "Nodegroups are still inactive after the 
allocated time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        self.log.info("No nodegroups remain, deleting cluster.")
+        return eks_hook.delete_cluster(name=self.clusterName)
+
+
+class EKSDeleteNodegroupOperator(BaseOperator):
+    """
+    Deletes an Amazon EKS Nodegroup from an Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster that is associated 
with your nodegroup.
+    :type cluster_name: str
+    :param nodegroup_name: The name of the nodegroup to delete.
+    :type nodegroup_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_name: str,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupName = nodegroup_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=self.nodegroupName)
+
+
+class EKSDescribeAllClustersOperator(BaseOperator):
+    """
+    Describes all Amazon EKS Clusters in your AWS account.
+
+    :param max_results: The maximum number of results to return.
+    :type max_results: int
+    :param next_token: The nextToken value returned from a previous paginated 
execution.
+    :type next_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+    :param verbose: Provides additional logging if set to True.  Defaults to 
False.
+    :type verbose: bool
+
+    """
+
+    def __init__(
+        self,
+        max_results: Optional[int] = DEFAULT_RESULTS_PER_PAGE,
+        next_token: Optional[str] = DEFAULT_PAGINATION_TOKEN,
+        verbose: Optional[bool] = False,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.maxResults = max_results
+        self.nextToken = next_token
+        self.verbose = verbose
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        response = eks_hook.list_clusters(
+            verbose=self.verbose, maxResults=self.maxResults, 
nextToken=self.nextToken
+        )
+        cluster_list = response.get('clusters')
+        next_token = response.get('nextToken')
+
+        result = []
+        for cluster in cluster_list:
+            full_describe = json.loads(eks_hook.describe_cluster(name=cluster))
+            cluster_details = json.dumps(full_describe.get('cluster'))
+            result.append(cluster_details)
+
+        if self.verbose is True:
+            self.log.info("\n\t".join(["Cluster Details:"] + result))
+
+        return {'nextToken': next_token, 'clusters': result}
+
+
+class EKSDescribeAllNodegroupsOperator(BaseOperator):
+    """
+    Describes all Amazon EKS Nodegroups associated with the specified EKS 
Cluster.
+
+    :param max_results: The maximum number of results to return.
+    :type max_results: int
+    :param next_token: The nextToken value returned from a previous paginated 
execution.
+    :type next_token: str
+    :param cluster_name: The name of the Amazon EKS Cluster to check..
+    :type cluster_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+    :param verbose: Provides additional logging if set to True.  Defaults to 
False.
+    :type verbose: bool
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        max_results: Optional[int] = DEFAULT_RESULTS_PER_PAGE,
+        next_token: Optional[str] = DEFAULT_PAGINATION_TOKEN,
+        verbose: Optional[bool] = False,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.maxResults = max_results
+        self.nextToken = next_token
+        self.verbose = verbose
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        response = eks_hook.list_nodegroups(
+            clusterName=self.clusterName,
+            verbose=self.verbose,
+            maxResults=self.maxResults,
+            nextToken=self.nextToken,
+        )
+        nodegroup_list = response.get('nodegroups')
+        next_token = response.get('nextToken')
+
+        result = []
+        for nodegroup in nodegroup_list:
+            full_describe = json.loads(
+                eks_hook.describe_nodegroup(clusterName=self.clusterName, 
nodegroupName=nodegroup)
+            )
+            nodegroup_details = json.dumps(full_describe.get('nodegroup'))
+            result.append(nodegroup_details)
+
+        if self.verbose is True:
+            self.log.info("\n\t".join(["Nodegroup Details:"] + result))
+
+        return {'nextToken': next_token, 'nodegroups': result}
+
+
+class EKSDescribeClusterOperator(BaseOperator):
+    """
+    Returns descriptive information about an Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDescribeClusterOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to describe.
+    :type cluster_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+    :param verbose: Provides additional logging if set to True.  Defaults to 
False.
+    :type verbose: bool
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        verbose: Optional[bool] = False,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.verbose = verbose
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        response = eks_hook.describe_cluster(name=self.clusterName, 
verbose=self.verbose)
+        response_json = json.loads(response)
+        # Extract the cluster data, drop the request metadata
+        cluster_data = response_json.get('cluster')
+        return json.dumps(cluster_data)
+
+
+class EKSDescribeNodegroupOperator(BaseOperator):
+    """
+    Returns descriptive information about the Amazon EKS Nodegroup.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDescribeNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster associated with 
the nodegroup.
+    :type cluster_name: str
+    :param nodegroup_name: The name of the Amazon EKS Nodegroup to describe.
+    :type nodegroup_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+    :param verbose: Provides additional logging if set to True.  Defaults to 
False.
+    :type verbose: bool
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_name: str,
+        verbose: Optional[bool] = False,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupName = nodegroup_name
+        self.verbose = verbose
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EksHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        response = eks_hook.describe_nodegroup(
+            clusterName=self.clusterName, nodegroupName=self.nodegroupName, 
verbose=self.verbose
+        )
+        response_json = json.loads(response)
+        # Extract the nodegroup data, drop the request metadata
+        nodegroup_data = response_json.get('nodegroup')
+        return json.dumps(nodegroup_data)
+
+
+class EKSListClustersOperator(BaseOperator):

Review comment:
       Same here actually -- I can't think of a case where having this appear 
in a DAG is actually useful.
   
   Did you have a use case/workflow in mind when adding these?

##########
File path: airflow/providers/amazon/aws/utils/eks_kube_config.py
##########
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from shutil import which
+from typing import Optional
+
+import boto3
+import yaml
+
+HOME = os.environ.get('HOME', '/tmp')
+DEFAULT_KUBE_CONFIG_FILENAME = 'config'
+DEFAULT_KUBE_CONFIG_PATH = str(os.path.join(HOME, '/.kube/', 
DEFAULT_KUBE_CONFIG_FILENAME))
+DEFAULT_CONTEXT_NAME = 'aws'
+DEFAULT_NAMESPACE_NAME = 'default'
+DEFAULT_POD_USERNAME = 'aws'
+
+
+def generate_config_file(
+    eks_cluster_name: str,
+    eks_namespace_name: str,
+    aws_profile: Optional[str],
+    kube_config_file_location: Optional[str] = DEFAULT_KUBE_CONFIG_PATH,
+    pod_username: Optional[str] = DEFAULT_POD_USERNAME,
+    pod_context: Optional[str] = DEFAULT_CONTEXT_NAME,
+    role_arn: Optional[str] = None,
+    aws_region: Optional[str] = None,
+) -> None:
+    """
+    Writes the kubeconfig file given an EKS Cluster name, AWS region, and file 
path.
+
+    :param eks_cluster_name: The name of the cluster to create the EKS Managed 
Nodegroup in.
+    :type eks_cluster_name: str
+    :param eks_namespace_name: The namespace to run within kubernetes.
+    :type eks_namespace_name: str
+    :param aws_profile: The named profile containing the credentials for the 
AWS CLI tool to use.
+    :type aws_profile: str
+    :param kube_config_file_location: Path to save the generated kube_config 
file to.
+    :type kube_config_file_location: str
+    :param pod_username: The username under which to execute the pod.
+    :type pod_username: str
+    :param pod_context: The name of the context access parameters to use.
+    :type pod_context: str
+    :param role_arn: The Amazon Resource Name (ARN) of the IAM role to 
associate with your nodegroup.
+    :type role_arn: str
+    :param aws_region: The name of the AWS Region the EKS Cluster resides in.
+    :type aws_region: str
+    """
+    installed = which("aws")
+    if installed is None:
+        message = (
+            "AWS CLI version 2 must be installed on the worker.  See: "
+            
"https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html";
+        )
+        print(message)
+        raise UnmetDependency(message)
+
+    # Set up the client
+    session = boto3.Session(region_name=aws_region, profile_name=aws_profile)
+    eks_client = session.client("eks")
+
+    # get cluster details
+    cluster = eks_client.describe_cluster(name=eks_cluster_name)
+    cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+    cluster_ep = cluster["cluster"]["endpoint"]
+
+    # build the cluster config hash
+    cli_args = [
+        "--region",
+        aws_region,
+        "eks",
+        "get-token",
+        "--cluster-name",
+        eks_cluster_name,
+    ]
+    if role_arn:
+        cli_args.extend(["--role-arn", role_arn])
+
+    cluster_config = {
+        "apiVersion": "v1",
+        "kind": "Config",
+        "clusters": [
+            {
+                "cluster": {"server": cluster_ep, 
"certificate-authority-data": cluster_cert},
+                "name": eks_cluster_name,
+            }
+        ],
+        "contexts": [
+            {
+                "context": {
+                    "cluster": eks_cluster_name,
+                    "namespace": eks_namespace_name,
+                    "user": pod_username,
+                },
+                "name": pod_context,
+            }
+        ],
+        "current-context": pod_context,
+        "preferences": {},
+        "users": [
+            {
+                "name": pod_username,
+                "user": {
+                    "exec": {
+                        "apiVersion": "client.authentication.k8s.io/v1alpha1",
+                        "args": cli_args,
+                        "command": "aws",
+                    }
+                },
+            }
+        ],
+    }
+
+    config_text = yaml.dump(cluster_config, default_flow_style=False)
+    with open(kube_config_file_location, "w") as config_file:
+        config_file.write(config_text)
+
+
+class UnmetDependency(BaseException):

Review comment:
       ```suggestion
   class UnmetDependency(Exception):
   ```
   
   You should almost never use BaseException.

##########
File path: airflow/providers/amazon/aws/utils/eks_kube_config.py
##########
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from shutil import which
+from typing import Optional
+
+import boto3
+import yaml
+
+HOME = os.environ.get('HOME', '/tmp')
+DEFAULT_KUBE_CONFIG_FILENAME = 'config'
+DEFAULT_KUBE_CONFIG_PATH = str(os.path.join(HOME, '/.kube/', 
DEFAULT_KUBE_CONFIG_FILENAME))
+DEFAULT_CONTEXT_NAME = 'aws'
+DEFAULT_NAMESPACE_NAME = 'default'
+DEFAULT_POD_USERNAME = 'aws'
+
+
+def generate_config_file(
+    eks_cluster_name: str,
+    eks_namespace_name: str,
+    aws_profile: Optional[str],
+    kube_config_file_location: Optional[str] = DEFAULT_KUBE_CONFIG_PATH,
+    pod_username: Optional[str] = DEFAULT_POD_USERNAME,
+    pod_context: Optional[str] = DEFAULT_CONTEXT_NAME,
+    role_arn: Optional[str] = None,
+    aws_region: Optional[str] = None,
+) -> None:
+    """
+    Writes the kubeconfig file given an EKS Cluster name, AWS region, and file 
path.
+
+    :param eks_cluster_name: The name of the cluster to create the EKS Managed 
Nodegroup in.
+    :type eks_cluster_name: str
+    :param eks_namespace_name: The namespace to run within kubernetes.
+    :type eks_namespace_name: str
+    :param aws_profile: The named profile containing the credentials for the 
AWS CLI tool to use.
+    :type aws_profile: str
+    :param kube_config_file_location: Path to save the generated kube_config 
file to.
+    :type kube_config_file_location: str
+    :param pod_username: The username under which to execute the pod.
+    :type pod_username: str
+    :param pod_context: The name of the context access parameters to use.
+    :type pod_context: str
+    :param role_arn: The Amazon Resource Name (ARN) of the IAM role to 
associate with your nodegroup.
+    :type role_arn: str
+    :param aws_region: The name of the AWS Region the EKS Cluster resides in.
+    :type aws_region: str
+    """
+    installed = which("aws")
+    if installed is None:
+        message = (
+            "AWS CLI version 2 must be installed on the worker.  See: "

Review comment:
       Your test doesn't check for v2 specifically -- the pip-installable v1 of 
awscli would also pass this check.
   
   Do we _need_ v2 here?
   
   

##########
File path: airflow/providers/amazon/aws/utils/eks_kube_config.py
##########
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from shutil import which
+from typing import Optional
+
+import boto3
+import yaml
+
+HOME = os.environ.get('HOME', '/tmp')
+DEFAULT_KUBE_CONFIG_FILENAME = 'config'
+DEFAULT_KUBE_CONFIG_PATH = str(os.path.join(HOME, '/.kube/', 
DEFAULT_KUBE_CONFIG_FILENAME))
+DEFAULT_CONTEXT_NAME = 'aws'
+DEFAULT_NAMESPACE_NAME = 'default'
+DEFAULT_POD_USERNAME = 'aws'
+
+
+def generate_config_file(
+    eks_cluster_name: str,
+    eks_namespace_name: str,
+    aws_profile: Optional[str],
+    kube_config_file_location: Optional[str] = DEFAULT_KUBE_CONFIG_PATH,
+    pod_username: Optional[str] = DEFAULT_POD_USERNAME,
+    pod_context: Optional[str] = DEFAULT_CONTEXT_NAME,
+    role_arn: Optional[str] = None,
+    aws_region: Optional[str] = None,
+) -> None:
+    """
+    Writes the kubeconfig file given an EKS Cluster name, AWS region, and file 
path.
+
+    :param eks_cluster_name: The name of the cluster to create the EKS Managed 
Nodegroup in.
+    :type eks_cluster_name: str
+    :param eks_namespace_name: The namespace to run within kubernetes.
+    :type eks_namespace_name: str
+    :param aws_profile: The named profile containing the credentials for the 
AWS CLI tool to use.
+    :type aws_profile: str
+    :param kube_config_file_location: Path to save the generated kube_config 
file to.
+    :type kube_config_file_location: str
+    :param pod_username: The username under which to execute the pod.
+    :type pod_username: str
+    :param pod_context: The name of the context access parameters to use.
+    :type pod_context: str
+    :param role_arn: The Amazon Resource Name (ARN) of the IAM role to 
associate with your nodegroup.
+    :type role_arn: str
+    :param aws_region: The name of the AWS Region the EKS Cluster resides in.
+    :type aws_region: str
+    """
+    installed = which("aws")
+    if installed is None:
+        message = (
+            "AWS CLI version 2 must be installed on the worker.  See: "
+            
"https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html";
+        )
+        print(message)

Review comment:
       ```suggestion
   ```

##########
File path: airflow/providers/amazon/aws/utils/eks_kube_config.py
##########
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from shutil import which
+from typing import Optional
+
+import boto3
+import yaml
+
+HOME = os.environ.get('HOME', '/tmp')
+DEFAULT_KUBE_CONFIG_FILENAME = 'config'
+DEFAULT_KUBE_CONFIG_PATH = str(os.path.join(HOME, '/.kube/', 
DEFAULT_KUBE_CONFIG_FILENAME))
+DEFAULT_CONTEXT_NAME = 'aws'
+DEFAULT_NAMESPACE_NAME = 'default'
+DEFAULT_POD_USERNAME = 'aws'
+
+
+def generate_config_file(
+    eks_cluster_name: str,
+    eks_namespace_name: str,
+    aws_profile: Optional[str],
+    kube_config_file_location: Optional[str] = DEFAULT_KUBE_CONFIG_PATH,
+    pod_username: Optional[str] = DEFAULT_POD_USERNAME,
+    pod_context: Optional[str] = DEFAULT_CONTEXT_NAME,
+    role_arn: Optional[str] = None,
+    aws_region: Optional[str] = None,
+) -> None:
+    """
+    Writes the kubeconfig file given an EKS Cluster name, AWS region, and file 
path.
+
+    :param eks_cluster_name: The name of the cluster to create the EKS Managed 
Nodegroup in.
+    :type eks_cluster_name: str
+    :param eks_namespace_name: The namespace to run within kubernetes.
+    :type eks_namespace_name: str
+    :param aws_profile: The named profile containing the credentials for the 
AWS CLI tool to use.
+    :type aws_profile: str
+    :param kube_config_file_location: Path to save the generated kube_config 
file to.
+    :type kube_config_file_location: str
+    :param pod_username: The username under which to execute the pod.
+    :type pod_username: str
+    :param pod_context: The name of the context access parameters to use.
+    :type pod_context: str
+    :param role_arn: The Amazon Resource Name (ARN) of the IAM role to 
associate with your nodegroup.
+    :type role_arn: str
+    :param aws_region: The name of the AWS Region the EKS Cluster resides in.
+    :type aws_region: str
+    """
+    installed = which("aws")
+    if installed is None:
+        message = (
+            "AWS CLI version 2 must be installed on the worker.  See: "
+            
"https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html";
+        )
+        print(message)
+        raise UnmetDependency(message)
+
+    # Set up the client
+    session = boto3.Session(region_name=aws_region, profile_name=aws_profile)
+    eks_client = session.client("eks")
+
+    # get cluster details
+    cluster = eks_client.describe_cluster(name=eks_cluster_name)
+    cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+    cluster_ep = cluster["cluster"]["endpoint"]
+
+    # build the cluster config hash
+    cli_args = [
+        "--region",
+        aws_region,
+        "eks",
+        "get-token",
+        "--cluster-name",
+        eks_cluster_name,
+    ]
+    if role_arn:
+        cli_args.extend(["--role-arn", role_arn])

Review comment:
       Is 
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/eks.html#EKS.Client.generate_presigned_url
 of any help here?

##########
File path: airflow/providers/amazon/aws/utils/eks_kube_config.py
##########
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from shutil import which
+from typing import Optional
+
+import boto3
+import yaml
+
+HOME = os.environ.get('HOME', '/tmp')
+DEFAULT_KUBE_CONFIG_FILENAME = 'config'
+DEFAULT_KUBE_CONFIG_PATH = str(os.path.join(HOME, '/.kube/', 
DEFAULT_KUBE_CONFIG_FILENAME))
+DEFAULT_CONTEXT_NAME = 'aws'
+DEFAULT_NAMESPACE_NAME = 'default'
+DEFAULT_POD_USERNAME = 'aws'
+
+
+def generate_config_file(

Review comment:
       This appears to be used only in one place, EKSPodOperator, so this 
possibly doesn't need to be in a separate file
   
   Perhaps this could be a method on the EksHook? (Given it connects to EKS and 
does describe cluster, I think that is the right place for it)

##########
File path: setup.py
##########
@@ -500,7 +500,7 @@ def write_version(filename: str = os.path.join(*[my_dir, 
"airflow", "git_version
     'jira',
     'jsondiff',
     'mongomock',
-    'moto~=2.0',
+    'moto~=2.0.10',

Review comment:
       ```suggestion
       'moto~=2.0,>=2.0.10',
   ```
   
   `~=2.0.10`  is the same as `>=2.0.10,<2.1` which is more restrictive than we 
want I think.

##########
File path: tests/providers/amazon/aws/operators/test_eks.py
##########
@@ -0,0 +1,299 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from typing import List
+from unittest import mock
+
+from moto.eks.responses import DEFAULT_NEXT_TOKEN
+
+from airflow.providers.amazon.aws.hooks.eks import EksHook
+from airflow.providers.amazon.aws.operators.eks import (
+    EKSCreateClusterOperator,
+    EKSCreateNodegroupOperator,
+    EKSDeleteClusterOperator,
+    EKSDeleteNodegroupOperator,
+    EKSDescribeAllClustersOperator,
+    EKSDescribeAllNodegroupsOperator,
+    EKSDescribeClusterOperator,
+    EKSDescribeNodegroupOperator,
+    EKSListClustersOperator,
+    EKSListNodegroupsOperator,
+)
+from tests.providers.amazon.aws.utils.test_eks_constants import (

Review comment:
       ```suggestion
   from tests.providers.amazon.aws.utils.eks_constants import (
   ```
   
   If that file doesn't contain any tests we shouldn't prefix it with `test_`

##########
File path: tests/providers/amazon/aws/operators/test_eks.py
##########
@@ -0,0 +1,299 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from typing import List
+from unittest import mock
+
+from moto.eks.responses import DEFAULT_NEXT_TOKEN
+
+from airflow.providers.amazon.aws.hooks.eks import EksHook
+from airflow.providers.amazon.aws.operators.eks import (
+    EKSCreateClusterOperator,
+    EKSCreateNodegroupOperator,
+    EKSDeleteClusterOperator,
+    EKSDeleteNodegroupOperator,
+    EKSDescribeAllClustersOperator,
+    EKSDescribeAllNodegroupsOperator,
+    EKSDescribeClusterOperator,
+    EKSDescribeNodegroupOperator,
+    EKSListClustersOperator,
+    EKSListNodegroupsOperator,
+)
+from tests.providers.amazon.aws.utils.test_eks_constants import (
+    NODEROLE_ARN_VALUE,
+    RESOURCES_VPC_CONFIG_VALUE,
+    ROLE_ARN_VALUE,
+    STATUS_VALUE,
+    SUBNETS_VALUE,
+    TASK_ID,
+)
+from tests.providers.amazon.aws.utils.test_eks_utils import convert_keys, 
random_names

Review comment:
       Same here.

##########
File path: airflow/providers/amazon/aws/hooks/eks.py
##########
@@ -0,0 +1,346 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""Interact with Amazon EKS, using the boto3 library."""
+
+import json
+from typing import Dict, List, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.utils.json import AirflowJsonEncoder
+
+DEFAULT_RESULTS_PER_PAGE = 100
+DEFAULT_PAGINATION_TOKEN = ''
+
+
+class EKSHook(AwsBaseHook):

Review comment:
       Given it's an acronym keeping it as `EKSHook` is allowed, and just 
because we made the wrong decision in the past with AwsBaseHook doesn't mean we 
_have to_ stick by it here :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to