ferruzzi commented on a change in pull request #16571:
URL: https://github.com/apache/airflow/pull/16571#discussion_r659924211



##########
File path: airflow/providers/amazon/aws/hooks/eks.py
##########
@@ -0,0 +1,346 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""Interact with Amazon EKS, using the boto3 library."""
+
+import json
+from typing import Dict, List, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.utils.json import AirflowJsonEncoder
+
+DEFAULT_RESULTS_PER_PAGE = 100
+DEFAULT_PAGINATION_TOKEN = ''
+
+
+class EKSHook(AwsBaseHook):

Review comment:
       Renamed to EksHook, you are right.  I like EKSHook better because it's 
an abbreviation but I'll follow conventions.

##########
File path: airflow/providers/amazon/aws/utils/eks_kube_config.py
##########
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from shutil import which
+from typing import Optional
+
+import boto3
+import yaml
+
+HOME = os.environ.get('HOME', '/tmp')
+DEFAULT_KUBE_CONFIG_FILENAME = 'config'
+DEFAULT_KUBE_CONFIG_PATH = str(os.path.join(HOME, '/.kube/', 
DEFAULT_KUBE_CONFIG_FILENAME))
+DEFAULT_CONTEXT_NAME = 'aws'
+DEFAULT_NAMESPACE_NAME = 'default'
+DEFAULT_POD_USERNAME = 'aws'
+
+
+def generate_config_file(
+    eks_cluster_name: str,
+    eks_namespace_name: str,
+    aws_profile: Optional[str],
+    kube_config_file_location: Optional[str] = DEFAULT_KUBE_CONFIG_PATH,
+    pod_username: Optional[str] = DEFAULT_POD_USERNAME,
+    pod_context: Optional[str] = DEFAULT_CONTEXT_NAME,
+    role_arn: Optional[str] = None,
+    aws_region: Optional[str] = None,
+) -> None:
+    """
+    Writes the kubeconfig file given an EKS Cluster name, AWS region, and file 
path.
+
+    :param eks_cluster_name: The name of the cluster to create the EKS Managed 
Nodegroup in.
+    :type eks_cluster_name: str
+    :param eks_namespace_name: The namespace to run within kubernetes.
+    :type eks_namespace_name: str
+    :param aws_profile: The named profile containing the credentials for the 
AWS CLI tool to use.
+    :type aws_profile: str
+    :param kube_config_file_location: Path to save the generated kube_config 
file to.
+    :type kube_config_file_location: str
+    :param pod_username: The username under which to execute the pod.
+    :type pod_username: str
+    :param pod_context: The name of the context access parameters to use.
+    :type pod_context: str
+    :param role_arn: The Amazon Resource Name (ARN) of the IAM role to 
associate with your nodegroup.
+    :type role_arn: str
+    :param aws_region: The name of the AWS Region the EKS Cluster resides in.
+    :type aws_region: str
+    """
+    installed = which("aws")
+    if installed is None:
+        message = (
+            "AWS CLI version 2 must be installed on the worker.  See: "
+            
"https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html";
+        )
+        print(message)
+        raise UnmetDependency(message)
+
+    # Set up the client
+    session = boto3.Session(region_name=aws_region, profile_name=aws_profile)
+    eks_client = session.client("eks")
+
+    # get cluster details
+    cluster = eks_client.describe_cluster(name=eks_cluster_name)
+    cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+    cluster_ep = cluster["cluster"]["endpoint"]
+
+    # build the cluster config hash
+    cli_args = [
+        "--region",
+        aws_region,
+        "eks",
+        "get-token",
+        "--cluster-name",
+        eks_cluster_name,
+    ]
+    if role_arn:
+        cli_args.extend(["--role-arn", role_arn])
+
+    cluster_config = {
+        "apiVersion": "v1",
+        "kind": "Config",
+        "clusters": [
+            {
+                "cluster": {"server": cluster_ep, 
"certificate-authority-data": cluster_cert},
+                "name": eks_cluster_name,
+            }
+        ],
+        "contexts": [
+            {
+                "context": {
+                    "cluster": eks_cluster_name,
+                    "namespace": eks_namespace_name,
+                    "user": pod_username,
+                },
+                "name": pod_context,
+            }
+        ],
+        "current-context": pod_context,
+        "preferences": {},
+        "users": [
+            {
+                "name": pod_username,
+                "user": {
+                    "exec": {
+                        "apiVersion": "client.authentication.k8s.io/v1alpha1",
+                        "args": cli_args,
+                        "command": "aws",
+                    }
+                },
+            }
+        ],
+    }
+
+    config_text = yaml.dump(cluster_config, default_flow_style=False)
+    open(kube_config_file_location, "w").write(config_text)

Review comment:
       ACK.  Good catch, thanks.

##########
File path: airflow/providers/amazon/aws/operators/eks.py
##########
@@ -0,0 +1,737 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""This module contains Amazon EKS operators."""
+import json
+import os
+from datetime import datetime
+from time import sleep
+from typing import Dict, List, Optional
+
+from boto3 import Session
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.eks import DEFAULT_PAGINATION_TOKEN, 
DEFAULT_RESULTS_PER_PAGE, EKSHook
+from airflow.providers.amazon.aws.utils.eks_kube_config import (
+    DEFAULT_CONTEXT_NAME,
+    DEFAULT_KUBE_CONFIG_PATH,
+    DEFAULT_NAMESPACE_NAME,
+    DEFAULT_POD_USERNAME,
+    generate_config_file,
+)
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
+
+CHECK_INTERVAL_SECONDS = 15
+TIMEOUT_SECONDS = 25 * 60
+CONN_ID = "eks"
+REGION = Session().region_name
+DEFAULT_COMPUTE_TYPE = 'nodegroup'
+DEFAULT_NODEGROUP_NAME_SUFFIX = '-nodegroup'
+DEFAULT_POD_NAME = 'pod'
+KUBE_CONFIG_ENV_VAR = 'KUBECONFIG'
+
+
+class EKSCreateClusterOperator(BaseOperator):
+    """
+    Creates an Amazon EKS Cluster control plane.
+
+    Optionally, can also create the supporting compute architecture:
+    If argument 'compute' is provided with a value of 'nodegroup', will also 
attempt to create an Amazon
+    EKS Managed Nodegroup for the cluster.  See EKSCreateNodegroupOperator 
documentation for requirements.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateClusterOperator`
+
+    :param cluster_name: The unique name to give to your Amazon EKS Cluster.
+    :type cluster_name: str
+    :param cluster_role_arn: The Amazon Resource Name (ARN) of the IAM role 
that provides permissions for the
+       Kubernetes control plane to make calls to AWS API operations on your 
behalf.
+    :type cluster_role_arn: str
+    :param resources_vpc_config: The VPC configuration used by the cluster 
control plane.
+    :type resources_vpc_config: Dict
+    :param compute: The type of compute architecture to generate along with 
the cluster.
+        Defaults to 'nodegroup' to generate an EKS Managed Nodegroup.
+    :type compute: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    If 'compute' is 'nodegroup', the following are required:
+
+    :param nodegroup_name: The unique name to give your EKS Managed Nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_role_arn: The Amazon Resource Name (ARN) of the IAM role 
to associate
+         with the EKS Managed Nodegroup.
+    :type nodegroup_role_arn: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        cluster_role_arn: str,
+        resources_vpc_config: Dict,
+        nodegroup_name: Optional[str] = None,
+        nodegroup_role_arn: Optional[str] = None,
+        compute: Optional[str] = DEFAULT_COMPUTE_TYPE,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.clusterRoleArn = cluster_role_arn
+        self.resourcesVpcConfig = resources_vpc_config
+        self.compute = compute
+        self.conn_id = conn_id
+        self.region = region
+
+        if self.compute == 'nodegroup':
+            self.nodegroupName = nodegroup_name or self.clusterName + 
DEFAULT_NODEGROUP_NAME_SUFFIX
+            if nodegroup_role_arn:
+                self.nodegroupRoleArn = nodegroup_role_arn
+            else:
+                message = "Creating an EKS Managed Nodegroup requires 
nodegroup_role_arn to be passed in."
+                self.log.error(message)
+                raise AttributeError(message)
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        eks_hook.create_cluster(
+            name=self.clusterName,
+            roleArn=self.clusterRoleArn,
+            resourcesVpcConfig=self.resourcesVpcConfig,
+        )
+
+        if self.compute is not None:
+            self.log.info("Waiting for EKS Cluster to provision.  This will 
take some time.")
+
+            countdown = TIMEOUT_SECONDS
+            while eks_hook.get_cluster_state(clusterName=self.clusterName) != 
"ACTIVE":
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    self.log.info(
+                        "Waiting for cluster to start.  Checking again in %d 
seconds", CHECK_INTERVAL_SECONDS
+                    )
+                    sleep(CHECK_INTERVAL_SECONDS)
+                else:
+                    message = "Cluster is still inactive after the allocated 
time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        if self.compute == 'nodegroup':
+            eks_hook.create_nodegroup(
+                clusterName=self.clusterName,
+                nodegroupName=self.nodegroupName,
+                subnets=self.resourcesVpcConfig.get('subnetIds'),
+                nodeRole=self.nodegroupRoleArn,
+            )
+
+
+class EKSCreateNodegroupOperator(BaseOperator):
+    """
+    Creates am Amazon EKS Managed Nodegroup for an existing Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to create the 
managed nodegroup in.
+    :type cluster_name: str
+    :param nodegroup_name: The unique name to give your managed nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_subnets:
+        The subnets to use for the Auto Scaling group that is created for the 
managed nodegroup.
+    :type nodegroup_subnets: List[str]
+    :param nodegroup_role_arn:
+        The Amazon Resource Name (ARN) of the IAM role to associate with the 
managed nodegroup.
+    :type nodegroup_role_arn: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_subnets: List[str],
+        nodegroup_role_arn: str,
+        nodegroup_name: Optional[str],
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupSubnets = nodegroup_subnets
+        self.nodegroupRoleArn = nodegroup_role_arn
+        self.nodegroupName = nodegroup_name or cluster_name + 
datetime.now().strftime("%Y%m%d_%H%M%S")
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.create_nodegroup(
+            clusterName=self.clusterName,
+            nodegroupName=self.nodegroupName,
+            subnets=self.nodegroupSubnets,
+            nodeRole=self.nodegroupRoleArn,
+        )
+
+
+class EKSDeleteClusterOperator(BaseOperator):
+    """
+    Deletes the Amazon EKS Cluster control plane and all nodegroups attached 
to it.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteClusterOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to delete.
+    :type cluster_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self, cluster_name: str, conn_id: Optional[str] = CONN_ID, region: 
Optional[str] = REGION, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        nodegroups = 
eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')
+        nodegroup_count = len(nodegroups)
+        if nodegroup_count > 0:
+            self.log.info(
+                "A cluster can not be deleted with attached nodegroups.  
Deleting %d nodegroups.",
+                nodegroup_count,
+            )
+            for group in nodegroups:
+                eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=group)
+
+            # Scaling up the timeout based on the number of nodegroups that 
are being processed.
+            additional_seconds = 5 * 60
+            countdown = TIMEOUT_SECONDS + (nodegroup_count * 
additional_seconds)
+            while 
len(eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')) > 
0:
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    sleep(CHECK_INTERVAL_SECONDS)
+                    self.log.info(
+                        "Waiting for the remaining %s nodegroups to delete.  
Checking again in %d seconds.",
+                        nodegroup_count,
+                        CHECK_INTERVAL_SECONDS,
+                    )
+                else:
+                    message = "Nodegroups are still inactive after the 
allocated time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        self.log.info("No nodegroups remain, deleting cluster.")
+        return eks_hook.delete_cluster(name=self.clusterName)
+
+
+class EKSDeleteNodegroupOperator(BaseOperator):
+    """
+    Deletes an Amazon EKS Nodegroup from an Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster that is associated 
with your nodegroup.
+    :type cluster_name: str
+    :param nodegroup_name: The name of the nodegroup to delete.
+    :type nodegroup_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_name: str,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupName = nodegroup_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=self.nodegroupName)
+
+
+class EKSDescribeAllClustersOperator(BaseOperator):
+    """
+    Describes all Amazon EKS Clusters in your AWS account.
+
+    :param max_results: The maximum number of results to return.
+    :type max_results: int
+    :param next_token: The nextToken value returned from a previous paginated 
execution.
+    :type next_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+    :param verbose: Provides additional logging if set to True.  Defaults to 
False.
+    :type verbose: bool
+
+    """
+
+    def __init__(
+        self,
+        max_results: Optional[int] = DEFAULT_RESULTS_PER_PAGE,
+        next_token: Optional[str] = DEFAULT_PAGINATION_TOKEN,
+        verbose: Optional[bool] = False,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.maxResults = max_results
+        self.nextToken = next_token
+        self.verbose = verbose
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        response = eks_hook.list_clusters(
+            verbose=self.verbose, maxResults=self.maxResults, 
nextToken=self.nextToken
+        )
+        cluster_list = response.get('clusters')
+        next_token = response.get('nextToken')
+
+        result = []
+        for cluster in cluster_list:
+            full_describe = json.loads(eks_hook.describe_cluster(name=cluster))
+            cluster_details = json.dumps(full_describe.get('cluster'))
+            result.append(cluster_details)
+
+        if self.verbose is True:
+            self.log.info("\n\t".join(["Cluster Details:"] + result))
+
+        return {'nextToken': next_token, 'clusters': result}
+
+
+class EKSDescribeAllNodegroupsOperator(BaseOperator):
+    """
+    Describes all Amazon EKS Nodegroups associated with the specified EKS 
Cluster.
+
+    :param max_results: The maximum number of results to return.
+    :type max_results: int
+    :param next_token: The nextToken value returned from a previous paginated 
execution.
+    :type next_token: str
+    :param cluster_name: The name of the Amazon EKS Cluster to check..
+    :type cluster_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+    :param verbose: Provides additional logging if set to True.  Defaults to 
False.
+    :type verbose: bool
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        max_results: Optional[int] = DEFAULT_RESULTS_PER_PAGE,
+        next_token: Optional[str] = DEFAULT_PAGINATION_TOKEN,
+        verbose: Optional[bool] = False,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.maxResults = max_results
+        self.nextToken = next_token
+        self.verbose = verbose
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        response = eks_hook.list_nodegroups(
+            clusterName=self.clusterName,
+            verbose=self.verbose,
+            maxResults=self.maxResults,
+            nextToken=self.nextToken,
+        )
+        nodegroup_list = response.get('nodegroups')
+        next_token = response.get('nextToken')
+
+        result = []
+        for nodegroup in nodegroup_list:
+            full_describe = json.loads(
+                eks_hook.describe_nodegroup(clusterName=self.clusterName, 
nodegroupName=nodegroup)
+            )
+            nodegroup_details = json.dumps(full_describe.get('nodegroup'))
+            result.append(nodegroup_details)
+
+        if self.verbose is True:
+            self.log.info("\n\t".join(["Nodegroup Details:"] + result))
+
+        return {'nextToken': next_token, 'nodegroups': result}
+
+
+class EKSDescribeClusterOperator(BaseOperator):
+    """
+    Returns descriptive information about an Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDescribeClusterOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to describe.
+    :type cluster_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+    :param verbose: Provides additional logging if set to True.  Defaults to 
False.
+    :type verbose: bool
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        verbose: Optional[bool] = False,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.verbose = verbose
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        response = eks_hook.describe_cluster(name=self.clusterName, 
verbose=self.verbose)
+        response_json = json.loads(response)
+        # Extract the cluster data, drop the request metadata
+        cluster_data = response_json.get('cluster')
+        return json.dumps(cluster_data)
+
+
+class EKSDescribeNodegroupOperator(BaseOperator):
+    """
+    Returns descriptive information about the Amazon EKS Nodegroup.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDescribeNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster associated with 
the nodegroup.
+    :type cluster_name: str
+    :param nodegroup_name: The name of the Amazon EKS Nodegroup to describe.
+    :type nodegroup_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+    :param verbose: Provides additional logging if set to True.  Defaults to 
False.
+    :type verbose: bool
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_name: str,
+        verbose: Optional[bool] = False,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupName = nodegroup_name
+        self.verbose = verbose
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        response = eks_hook.describe_nodegroup(
+            clusterName=self.clusterName, nodegroupName=self.nodegroupName, 
verbose=self.verbose
+        )
+        response_json = json.loads(response)
+        # Extract the nodegroup data, drop the request metadata
+        nodegroup_data = response_json.get('nodegroup')
+        return json.dumps(nodegroup_data)
+
+
+class EKSListClustersOperator(BaseOperator):
+    """
+    Lists the Amazon EKS Clusters in your AWS account with optional pagination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSListClustersOperator`
+
+    :param max_results: The maximum number of results to return.
+    :type max_results: int
+    :param next_token: The nextToken value returned from a previous paginated 
execution.
+    :type next_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+    :param verbose: Provides additional logging if set to True.  Defaults to 
False.
+    :type verbose: bool
+
+    """
+
+    def __init__(
+        self,
+        max_results: Optional[int] = DEFAULT_RESULTS_PER_PAGE,
+        next_token: Optional[str] = DEFAULT_PAGINATION_TOKEN,
+        verbose: Optional[bool] = False,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.maxResults = max_results
+        self.nextToken = next_token
+        self.verbose = verbose
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.list_clusters(
+            maxResults=self.maxResults, nextToken=self.nextToken, 
verbose=self.verbose
+        )
+
+
+class EKSListNodegroupsOperator(BaseOperator):
+    """
+    Lists the Amazon EKS Nodegroups associated with the specified EKS Cluster 
with optional pagination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSListNodegroupsOperator`
+
+    :param max_results: The maximum number of results to return.
+    :type max_results: int
+    :param next_token: The nextToken value returned from a previous paginated 
execution.
+    :type next_token: str
+    :param cluster_name: The name of the Amazon EKS Cluster to check..
+    :type cluster_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+    :param verbose: Provides additional logging if set to True.  Defaults to 
False.
+    :type verbose: bool
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        max_results: Optional[int] = DEFAULT_RESULTS_PER_PAGE,
+        next_token: Optional[str] = DEFAULT_PAGINATION_TOKEN,
+        verbose: Optional[bool] = False,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.maxResults = max_results
+        self.nextToken = next_token
+        self.verbose = verbose
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.list_nodegroups(
+            clusterName=self.clusterName,
+            verbose=self.verbose,
+            maxResults=self.maxResults,
+            nextToken=self.nextToken,
+        )
+
+
+class EKSPodOperator(KubernetesPodOperator):
+    """
+    Executes a task in a Kubernetes pod on the specified Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSPodOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to execute the 
task on.
+    :type cluster_name: str
+    :param cluster_role_arn: The Amazon Resource Name (ARN) of the IAM role 
that provides permissions
+       for the Kubernetes control plane to make calls to AWS API operations on 
your behalf.
+    :type cluster_role_arn: str
+    :param kube_config_file_path: Path to save the generated kube_config file 
to.
+    :type kube_config_file_path: str
+    :param in_cluster: If True, look for config inside the cluster; if False 
look for a local file path.
+    :type in_cluster: bool
+    :param namespace: The namespace in which to execute the pod.
+    :type namespace: str
+    :param pod_context: The security context to use while executing the pod.
+    :type pod_context: str
+    :param pod_name: The unique name to give the pod.
+    :type pod_name: str
+    :param pod_username: The username to use while executing the pod.
+    :type pod_username: str
+    :param aws_profile: The named profile containing the credentials for the 
AWS CLI tool to use.
+    :param aws_profile: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(  # pylint: disable=too-many-arguments,too-many-locals
+        self,
+        cluster_name: str,
+        cluster_role_arn: Optional[str] = None,
+        # A default path will be used if none is provided.
+        kube_config_file_path: Optional[str] = 
os.environ.get(KUBE_CONFIG_ENV_VAR, DEFAULT_KUBE_CONFIG_PATH),

Review comment:
       As it is written, the credential file will be overwritten each pod, yes. 
 I will look into alternatives.

##########
File path: airflow/providers/amazon/aws/sensors/eks.py
##########
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""Tracking the state of EKS Clusters and Nodegroups."""
+
+from typing import Optional
+
+from boto3 import Session
+
+from airflow.providers.amazon.aws.hooks.eks import EKSHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+CONN_ID = "eks"
+REGION = Session().region_name

Review comment:
       Upon further digging, it looks like we can set the default here to None 
and let the base hook do its magic if the user does not provide one.  I will 
try that and see what happens.
   
   Note to self:  Whatever we do here, also apply the same solution to the 
Operators.

##########
File path: airflow/providers/amazon/aws/operators/eks.py
##########
@@ -0,0 +1,737 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       Sure?   Which ones do you think would be valuable to add in here?

##########
File path: 
airflow/providers/amazon/aws/example_dags/example_eks_pod_operation.py
##########
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from datetime import datetime
+
+from airflow.models.dag import DAG
+from airflow.providers.amazon.aws.operators.eks import EKSPodOperator
+from airflow.utils.dates import days_ago
+
+####
+# NOTE: This example requires an existing EKS Cluster with a compute backend.
+# see: example_eks_create_cluster_with_nodegroup.py
+####
+
+CLUSTER_NAME = 'existing-cluster-with-nodegroup-ready-for-pod'
+BUCKET_SUFFIX = datetime.now().strftime("-%Y%b%d-%H%M").lower()
+ROLE_ARN = os.environ.get('ROLE_ARN', 
'arn:aws:iam::123456789012:role/role_name')
+
+with DAG(
+    dag_id='eks_run_pod_dag',
+    schedule_interval=None,
+    start_date=days_ago(2),
+    max_active_runs=1,
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_eks_pod_operator]
+    start_pod = EKSPodOperator(
+        task_id="run_pod",
+        cluster_name=CLUSTER_NAME,
+        # Optional IAM Role to assume for credentials when signing the token.
+        cluster_role_arn=ROLE_ARN,
+        image="amazon/aws-cli:latest",
+        cmds=["sh", "-c", "aws s3 mb s3://hello-world" + BUCKET_SUFFIX],

Review comment:
       ACK.  I'll change this to just echo to the terminal or something.

##########
File path: docs/apache-airflow-providers-amazon/operators/eks.rst
##########
@@ -0,0 +1,265 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Amazon Elastic Kubernetes Service (EKS) Operators
+=================================================
+
+`Amazon Elastic Kubernetes Service (Amazon EKS) 
<https://aws.amazon.com/eks/>`__  is a managed service
+that makes it easy for you to run Kubernetes on AWS without needing to stand 
up or maintain your own
+Kubernetes control plane. Kubernetes is an open-source system for automating 
the deployment, scaling,
+and management of containerized applications.
+
+.. contents::
+  :depth: 1
+  :local:
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+Overview
+--------
+
+Airflow to Amazon Elastic Kubernetes Service (EKS) integration provides 
Operators to create and
+interact with the EKS clusters and compute infrastructure.
+
+ - :class:`~airflow.providers.amazon.aws.operators.eks`
+
+4 example_dags are provided which showcase these operators in action.
+
+ - example_eks_create_cluster.py
+ - example_eks_create_cluster_with_nodegroup.py
+ - example_eks_create_nodegroup.py
+ - example_eks_pod_operator.py

Review comment:
       Does the spell check know to ignore anything inside double back ticks?

##########
File path: airflow/providers/amazon/aws/operators/eks.py
##########
@@ -0,0 +1,737 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""This module contains Amazon EKS operators."""
+import json
+import os
+from datetime import datetime
+from time import sleep
+from typing import Dict, List, Optional
+
+from boto3 import Session
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.eks import DEFAULT_PAGINATION_TOKEN, 
DEFAULT_RESULTS_PER_PAGE, EKSHook
+from airflow.providers.amazon.aws.utils.eks_kube_config import (
+    DEFAULT_CONTEXT_NAME,
+    DEFAULT_KUBE_CONFIG_PATH,
+    DEFAULT_NAMESPACE_NAME,
+    DEFAULT_POD_USERNAME,
+    generate_config_file,
+)
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
+
+CHECK_INTERVAL_SECONDS = 15
+TIMEOUT_SECONDS = 25 * 60
+CONN_ID = "eks"
+REGION = Session().region_name
+DEFAULT_COMPUTE_TYPE = 'nodegroup'
+DEFAULT_NODEGROUP_NAME_SUFFIX = '-nodegroup'
+DEFAULT_POD_NAME = 'pod'
+KUBE_CONFIG_ENV_VAR = 'KUBECONFIG'
+
+
+class EKSCreateClusterOperator(BaseOperator):
+    """
+    Creates an Amazon EKS Cluster control plane.
+
+    Optionally, can also create the supporting compute architecture:
+    If argument 'compute' is provided with a value of 'nodegroup', will also 
attempt to create an Amazon
+    EKS Managed Nodegroup for the cluster.  See EKSCreateNodegroupOperator 
documentation for requirements.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateClusterOperator`
+
+    :param cluster_name: The unique name to give to your Amazon EKS Cluster.
+    :type cluster_name: str
+    :param cluster_role_arn: The Amazon Resource Name (ARN) of the IAM role 
that provides permissions for the
+       Kubernetes control plane to make calls to AWS API operations on your 
behalf.
+    :type cluster_role_arn: str
+    :param resources_vpc_config: The VPC configuration used by the cluster 
control plane.
+    :type resources_vpc_config: Dict
+    :param compute: The type of compute architecture to generate along with 
the cluster.
+        Defaults to 'nodegroup' to generate an EKS Managed Nodegroup.
+    :type compute: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    If 'compute' is 'nodegroup', the following are required:
+
+    :param nodegroup_name: The unique name to give your EKS Managed Nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_role_arn: The Amazon Resource Name (ARN) of the IAM role 
to associate
+         with the EKS Managed Nodegroup.
+    :type nodegroup_role_arn: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        cluster_role_arn: str,
+        resources_vpc_config: Dict,
+        nodegroup_name: Optional[str] = None,
+        nodegroup_role_arn: Optional[str] = None,
+        compute: Optional[str] = DEFAULT_COMPUTE_TYPE,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.clusterRoleArn = cluster_role_arn
+        self.resourcesVpcConfig = resources_vpc_config
+        self.compute = compute
+        self.conn_id = conn_id
+        self.region = region
+
+        if self.compute == 'nodegroup':
+            self.nodegroupName = nodegroup_name or self.clusterName + 
DEFAULT_NODEGROUP_NAME_SUFFIX
+            if nodegroup_role_arn:
+                self.nodegroupRoleArn = nodegroup_role_arn
+            else:
+                message = "Creating an EKS Managed Nodegroup requires 
nodegroup_role_arn to be passed in."
+                self.log.error(message)
+                raise AttributeError(message)
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        eks_hook.create_cluster(
+            name=self.clusterName,
+            roleArn=self.clusterRoleArn,
+            resourcesVpcConfig=self.resourcesVpcConfig,
+        )
+
+        if self.compute is not None:
+            self.log.info("Waiting for EKS Cluster to provision.  This will 
take some time.")
+
+            countdown = TIMEOUT_SECONDS
+            while eks_hook.get_cluster_state(clusterName=self.clusterName) != 
"ACTIVE":
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    self.log.info(
+                        "Waiting for cluster to start.  Checking again in %d 
seconds", CHECK_INTERVAL_SECONDS
+                    )
+                    sleep(CHECK_INTERVAL_SECONDS)
+                else:
+                    message = "Cluster is still inactive after the allocated 
time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        if self.compute == 'nodegroup':
+            eks_hook.create_nodegroup(
+                clusterName=self.clusterName,
+                nodegroupName=self.nodegroupName,
+                subnets=self.resourcesVpcConfig.get('subnetIds'),
+                nodeRole=self.nodegroupRoleArn,
+            )
+
+
+class EKSCreateNodegroupOperator(BaseOperator):
+    """
+    Creates am Amazon EKS Managed Nodegroup for an existing Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to create the 
managed nodegroup in.
+    :type cluster_name: str
+    :param nodegroup_name: The unique name to give your managed nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_subnets:
+        The subnets to use for the Auto Scaling group that is created for the 
managed nodegroup.
+    :type nodegroup_subnets: List[str]
+    :param nodegroup_role_arn:
+        The Amazon Resource Name (ARN) of the IAM role to associate with the 
managed nodegroup.
+    :type nodegroup_role_arn: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_subnets: List[str],
+        nodegroup_role_arn: str,
+        nodegroup_name: Optional[str],
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupSubnets = nodegroup_subnets
+        self.nodegroupRoleArn = nodegroup_role_arn
+        self.nodegroupName = nodegroup_name or cluster_name + 
datetime.now().strftime("%Y%m%d_%H%M%S")
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.create_nodegroup(
+            clusterName=self.clusterName,
+            nodegroupName=self.nodegroupName,
+            subnets=self.nodegroupSubnets,
+            nodeRole=self.nodegroupRoleArn,
+        )
+
+
+class EKSDeleteClusterOperator(BaseOperator):
+    """
+    Deletes the Amazon EKS Cluster control plane and all nodegroups attached 
to it.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteClusterOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to delete.
+    :type cluster_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self, cluster_name: str, conn_id: Optional[str] = CONN_ID, region: 
Optional[str] = REGION, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        nodegroups = 
eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')
+        nodegroup_count = len(nodegroups)
+        if nodegroup_count > 0:
+            self.log.info(
+                "A cluster can not be deleted with attached nodegroups.  
Deleting %d nodegroups.",
+                nodegroup_count,
+            )
+            for group in nodegroups:
+                eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=group)
+
+            # Scaling up the timeout based on the number of nodegroups that 
are being processed.
+            additional_seconds = 5 * 60
+            countdown = TIMEOUT_SECONDS + (nodegroup_count * 
additional_seconds)
+            while 
len(eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')) > 
0:
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    sleep(CHECK_INTERVAL_SECONDS)
+                    self.log.info(
+                        "Waiting for the remaining %s nodegroups to delete.  
Checking again in %d seconds.",
+                        nodegroup_count,
+                        CHECK_INTERVAL_SECONDS,
+                    )
+                else:
+                    message = "Nodegroups are still inactive after the 
allocated time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        self.log.info("No nodegroups remain, deleting cluster.")
+        return eks_hook.delete_cluster(name=self.clusterName)
+
+
+class EKSDeleteNodegroupOperator(BaseOperator):
+    """
+    Deletes an Amazon EKS Nodegroup from an Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster that is associated 
with your nodegroup.
+    :type cluster_name: str
+    :param nodegroup_name: The name of the nodegroup to delete.
+    :type nodegroup_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_name: str,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupName = nodegroup_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=self.nodegroupName)
+
+
+class EKSDescribeAllClustersOperator(BaseOperator):
+    """
+    Describes all Amazon EKS Clusters in your AWS account.
+
+    :param max_results: The maximum number of results to return.
+    :type max_results: int
+    :param next_token: The nextToken value returned from a previous paginated 
execution.

Review comment:
       I'm not sure I follow the question.   These are passed in from the DAG 
like any other parameter.
   
   Many of the EKS APIs have a max result value of 100 entries so there should 
be a way to get the next paginated set.  They could get the nextToken through a 
prior task's XCOM or whatever, but I felt it should be available.

##########
File path: airflow/providers/amazon/aws/utils/eks_kube_config.py
##########
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from shutil import which
+from typing import Optional
+
+import boto3
+import yaml
+
+HOME = os.environ.get('HOME', '/tmp')
+DEFAULT_KUBE_CONFIG_FILENAME = 'config'
+DEFAULT_KUBE_CONFIG_PATH = str(os.path.join(HOME, '/.kube/', 
DEFAULT_KUBE_CONFIG_FILENAME))
+DEFAULT_CONTEXT_NAME = 'aws'
+DEFAULT_NAMESPACE_NAME = 'default'
+DEFAULT_POD_USERNAME = 'aws'
+
+
+def generate_config_file(
+    eks_cluster_name: str,
+    eks_namespace_name: str,
+    aws_profile: Optional[str],
+    kube_config_file_location: Optional[str] = DEFAULT_KUBE_CONFIG_PATH,
+    pod_username: Optional[str] = DEFAULT_POD_USERNAME,
+    pod_context: Optional[str] = DEFAULT_CONTEXT_NAME,
+    role_arn: Optional[str] = None,
+    aws_region: Optional[str] = None,
+) -> None:
+    """
+    Writes the kubeconfig file given an EKS Cluster name, AWS region, and file 
path.
+
+    :param eks_cluster_name: The name of the cluster to create the EKS Managed 
Nodegroup in.
+    :type eks_cluster_name: str
+    :param eks_namespace_name: The namespace to run within kubernetes.
+    :type eks_namespace_name: str
+    :param aws_profile: The named profile containing the credentials for the 
AWS CLI tool to use.
+    :type aws_profile: str
+    :param kube_config_file_location: Path to save the generated kube_config 
file to.
+    :type kube_config_file_location: str
+    :param pod_username: The username under which to execute the pod.
+    :type pod_username: str
+    :param pod_context: The name of the context access parameters to use.
+    :type pod_context: str
+    :param role_arn: The Amazon Resource Name (ARN) of the IAM role to 
associate with your nodegroup.
+    :type role_arn: str
+    :param aws_region: The name of the AWS Region the EKS Cluster resides in.
+    :type aws_region: str
+    """
+    installed = which("aws")

Review comment:
       The code as submitted is working on the Breeze test environment, I don't 
think it should matter what's hosting it.  The line you highlighted 
specifically is only there as a check to make sure the AWS CLI tool is 
installed as that is (currently) a prerequisite.

##########
File path: airflow/providers/amazon/aws/utils/eks_kube_config.py
##########
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from shutil import which
+from typing import Optional
+
+import boto3
+import yaml
+
+HOME = os.environ.get('HOME', '/tmp')
+DEFAULT_KUBE_CONFIG_FILENAME = 'config'
+DEFAULT_KUBE_CONFIG_PATH = str(os.path.join(HOME, '/.kube/', 
DEFAULT_KUBE_CONFIG_FILENAME))
+DEFAULT_CONTEXT_NAME = 'aws'
+DEFAULT_NAMESPACE_NAME = 'default'
+DEFAULT_POD_USERNAME = 'aws'
+
+
+def generate_config_file(
+    eks_cluster_name: str,
+    eks_namespace_name: str,
+    aws_profile: Optional[str],
+    kube_config_file_location: Optional[str] = DEFAULT_KUBE_CONFIG_PATH,
+    pod_username: Optional[str] = DEFAULT_POD_USERNAME,
+    pod_context: Optional[str] = DEFAULT_CONTEXT_NAME,
+    role_arn: Optional[str] = None,
+    aws_region: Optional[str] = None,
+) -> None:
+    """
+    Writes the kubeconfig file given an EKS Cluster name, AWS region, and file 
path.
+
+    :param eks_cluster_name: The name of the cluster to create the EKS Managed 
Nodegroup in.
+    :type eks_cluster_name: str
+    :param eks_namespace_name: The namespace to run within kubernetes.
+    :type eks_namespace_name: str
+    :param aws_profile: The named profile containing the credentials for the 
AWS CLI tool to use.
+    :type aws_profile: str
+    :param kube_config_file_location: Path to save the generated kube_config 
file to.
+    :type kube_config_file_location: str
+    :param pod_username: The username under which to execute the pod.
+    :type pod_username: str
+    :param pod_context: The name of the context access parameters to use.
+    :type pod_context: str
+    :param role_arn: The Amazon Resource Name (ARN) of the IAM role to 
associate with your nodegroup.
+    :type role_arn: str
+    :param aws_region: The name of the AWS Region the EKS Cluster resides in.
+    :type aws_region: str
+    """
+    installed = which("aws")
+    if installed is None:
+        message = (
+            "AWS CLI version 2 must be installed on the worker.  See: "
+            
"https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html";
+        )
+        print(message)
+        raise UnmetDependency(message)
+
+    # Set up the client
+    session = boto3.Session(region_name=aws_region, profile_name=aws_profile)
+    eks_client = session.client("eks")
+
+    # get cluster details
+    cluster = eks_client.describe_cluster(name=eks_cluster_name)
+    cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+    cluster_ep = cluster["cluster"]["endpoint"]
+
+    # build the cluster config hash
+    cli_args = [
+        "--region",
+        aws_region,
+        "eks",
+        "get-token",
+        "--cluster-name",
+        eks_cluster_name,
+    ]
+    if role_arn:
+        cli_args.extend(["--role-arn", role_arn])

Review comment:
       Using the AWS CLI was not our preference either, but had to settle for 
this due to other options not working correctly.  The EKS service does not 
support providing the token directly at all.  Searching online came up with a 
few workarounds, but the lesser evil was assuming that someone using an AWS 
service would be able to install an AWS tool.  There is also precedent for it 
in [Google's Kubernetes 
Operators](https://github.com/apache/airflow/blob/2625007c8aeca9ed98dea361ba13c2622482d71f/airflow/providers/google/cloud/operators/kubernetes_engine.py#L319)
 using the gcloud local script to accomplish the same thing.
   
   I'll test using your provided example to see if that works and report back.  
Dropping that dependency would be great.

##########
File path: docs/apache-airflow-providers-amazon/operators/eks.rst
##########
@@ -0,0 +1,265 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Amazon Elastic Kubernetes Service (EKS) Operators
+=================================================
+
+`Amazon Elastic Kubernetes Service (Amazon EKS) 
<https://aws.amazon.com/eks/>`__  is a managed service
+that makes it easy for you to run Kubernetes on AWS without needing to stand 
up or maintain your own
+Kubernetes control plane. Kubernetes is an open-source system for automating 
the deployment, scaling,
+and management of containerized applications.
+
+.. contents::
+  :depth: 1
+  :local:
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+Overview
+--------
+
+Airflow to Amazon Elastic Kubernetes Service (EKS) integration provides 
Operators to create and
+interact with the EKS clusters and compute infrastructure.
+
+ - :class:`~airflow.providers.amazon.aws.operators.eks`
+
+4 example_dags are provided which showcase these operators in action.
+
+ - example_eks_create_cluster.py
+ - example_eks_create_cluster_with_nodegroup.py
+ - example_eks_create_nodegroup.py
+ - example_eks_pod_operator.py
+
+
+.. _howto/operator:EKSCreateClusterOperator:
+
+Creating Amazon EKS Clusters
+----------------------------
+
+Purpose
+"""""""
+
+This example dag ``example_eks_create_cluster.py`` uses 
``EKSCreateClusterOperator`` to create an Amazon
+EKS Cluster, ``EKSListClustersOperator`` and ``EKSDescribeClusterOperator`` to 
verify creation, then
+``EKSDeleteClusterOperator`` to delete the Cluster.
+
+Prerequisites
+"""""""""""""
+
+An AWS IAM role with the following permissions:
+
+  "eks.amazonaws.com" must be added to the Trusted Relationships
+  "AmazonEKSClusterPolicy" IAM Policy must be attached
+
+Defining tasks
+""""""""""""""
+
+In the following code we create a new Amazon EKS Cluster.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_cluster.py
+    :language: python
+    :start-after: [START howto_operator_eks_create_cluster]
+    :end-before: [END howto_operator_eks_create_cluster]
+
+
+.. _howto/operator:EKSListClustersOperator:
+.. _howto/operator:EKSDescribeClusterOperator:
+
+
+Listing and Describing Amazon EKS Clusters
+-------------------------------------------
+
+Defining tasks
+""""""""""""""
+
+In the following code we list all Amazon EKS Clusters.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_cluster.py
+    :language: python
+    :start-after: [START howto_operator_eks_list_clusters]
+    :end-before: [END howto_operator_eks_list_clusters]
+
+In the following code we retrieve details for a given Amazon EKS Cluster.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_cluster.py
+    :language: python
+    :start-after: [START howto_operator_eks_describe_cluster]
+    :end-before: [END howto_operator_eks_describe_cluster]
+
+
+.. _howto/operator:EKSDeleteClusterOperator:
+
+Deleting Amazon EKS Clusters
+----------------------------
+
+Defining tasks
+""""""""""""""
+
+In the following code we delete a given Amazon EKS Cluster.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_cluster.py
+    :language: python
+    :start-after: [START howto_operator_eks_delete_cluster]
+    :end-before: [END howto_operator_eks_delete_cluster]
+
+
+.. _howto/operator:EKSCreateNodegroupOperator:
+
+Creating Amazon EKS Managed NodeGroups
+--------------------------------------
+
+Purpose
+"""""""
+
+This example dag ``example_eks_create_nodegroup.py`` uses 
``EKSCreateNodegroupOperator``
+to create an Amazon EKS Managed Nodegroup using an existing cluster, 
``EKSListNodegroupsOperator``
+and ``EKSDescribeNodegroupOperator`` to verify creation, then 
``EKSDeleteNodegroupOperator``
+to delete the nodegroup.
+
+Prerequisites
+"""""""""""""
+
+An AWS IAM role with the following permissions:
+
+  "ec2.amazon.aws.com" must be in the Trusted Relationships
+  "AmazonEC2ContainerRegistryReadOnly" IAM Policy must be attached
+  "AmazonEKSWorkerNodePolicy" IAM Policy must be attached
+
+Defining tasks
+""""""""""""""
+
+In the following code we create a new Amazon EKS Managed Nodegroup.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_nodegroup.py
+    :language: python
+    :start-after: [START howto_operator_eks_create_nodegroup]
+    :end-before: [END howto_operator_eks_create_nodegroup]
+
+
+.. _howto/operator:EKSListNodegroupsOperator:
+.. _howto/operator:EKSDescribeNodegroupOperator:
+
+Listing and Describing Amazon EKS Clusters
+-------------------------------------------
+
+Defining tasks
+""""""""""""""
+
+In the following code we retrieve details for a given Amazon EKS nodegroup.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_nodegroup.py
+    :language: python
+    :start-after: [START howto_operator_eks_describe_nodegroup]
+    :end-before: [END howto_operator_eks_describe_nodegroup]
+
+
+In the following code we list all Amazon EKS Nodegroups in a given EKS Cluster.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_nodegroup.py
+    :language: python
+    :start-after: [START howto_operator_eks_list_nodegroup]
+    :end-before: [END howto_operator_eks_list_nodegroup]
+
+
+.. _howto/operator:EKSDeleteNodegroupOperator:
+
+Deleting Amazon EKS Managed Nodegroups
+--------------------------------------
+
+Defining tasks
+""""""""""""""
+
+In the following code we delete an Amazon EKS nodegroup.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_nodegroup.py
+    :language: python
+    :start-after: [START howto_operator_eks_delete_nodegroup]
+    :end-before: [END howto_operator_eks_delete_nodegroup]
+
+
+Creating Amazon EKS Clusters and Node Groups Together
+------------------------------------------------------
+
+Purpose
+"""""""
+
+This example dag ``example_eks_create_stack.py`` demonstrates using
+``EKSCreateClusterOperator`` to create an Amazon EKS cluster and underlying
+Amazon EKS node group in one command.  ``EKSDescribeClustersOperator`` and
+``EKSDescribeNodegroupsOperator`` verify creation, then 
``EKSDeleteClusterOperator``
+deletes all created resources.
+
+Prerequisites
+"""""""""""""
+
+  "ec2.amazon.aws.com" must be in the Trusted Relationships
+  "eks.amazonaws.com" must be added to the Trusted Relationships
+  "AmazonEC2ContainerRegistryReadOnly" IAM Policy must be attached
+  "AmazonEKSClusterPolicy" IAM Policy must be attached
+  "AmazonEKSWorkerNodePolicy" IAM Policy must be attached

Review comment:
       Does the spell check know to ignore anything inside double back ticks?

##########
File path: airflow/providers/amazon/aws/hooks/eks.py
##########
@@ -0,0 +1,346 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""Interact with Amazon EKS, using the boto3 library."""
+
+import json
+from typing import Dict, List, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.utils.json import AirflowJsonEncoder
+
+DEFAULT_RESULTS_PER_PAGE = 100
+DEFAULT_PAGINATION_TOKEN = ''
+
+
+class EKSHook(AwsBaseHook):

Review comment:
       Fixed in 616d249

##########
File path: docs/apache-airflow-providers-amazon/operators/eks.rst
##########
@@ -0,0 +1,265 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Amazon Elastic Kubernetes Service (EKS) Operators
+=================================================
+
+`Amazon Elastic Kubernetes Service (Amazon EKS) 
<https://aws.amazon.com/eks/>`__  is a managed service
+that makes it easy for you to run Kubernetes on AWS without needing to stand 
up or maintain your own
+Kubernetes control plane. Kubernetes is an open-source system for automating 
the deployment, scaling,
+and management of containerized applications.
+
+.. contents::
+  :depth: 1
+  :local:
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+Overview
+--------
+
+Airflow to Amazon Elastic Kubernetes Service (EKS) integration provides 
Operators to create and
+interact with the EKS clusters and compute infrastructure.
+
+ - :class:`~airflow.providers.amazon.aws.operators.eks`
+
+4 example_dags are provided which showcase these operators in action.
+
+ - example_eks_create_cluster.py
+ - example_eks_create_cluster_with_nodegroup.py
+ - example_eks_create_nodegroup.py
+ - example_eks_pod_operator.py
+
+
+.. _howto/operator:EKSCreateClusterOperator:
+
+Creating Amazon EKS Clusters
+----------------------------
+
+Purpose
+"""""""
+
+This example dag ``example_eks_create_cluster.py`` uses 
``EKSCreateClusterOperator`` to create an Amazon
+EKS Cluster, ``EKSListClustersOperator`` and ``EKSDescribeClusterOperator`` to 
verify creation, then
+``EKSDeleteClusterOperator`` to delete the Cluster.
+
+Prerequisites
+"""""""""""""
+
+An AWS IAM role with the following permissions:
+
+  "eks.amazonaws.com" must be added to the Trusted Relationships
+  "AmazonEKSClusterPolicy" IAM Policy must be attached
+
+Defining tasks
+""""""""""""""
+
+In the following code we create a new Amazon EKS Cluster.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_cluster.py
+    :language: python
+    :start-after: [START howto_operator_eks_create_cluster]
+    :end-before: [END howto_operator_eks_create_cluster]
+
+
+.. _howto/operator:EKSListClustersOperator:
+.. _howto/operator:EKSDescribeClusterOperator:
+
+
+Listing and Describing Amazon EKS Clusters
+-------------------------------------------
+
+Defining tasks
+""""""""""""""
+
+In the following code we list all Amazon EKS Clusters.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_cluster.py
+    :language: python
+    :start-after: [START howto_operator_eks_list_clusters]
+    :end-before: [END howto_operator_eks_list_clusters]
+
+In the following code we retrieve details for a given Amazon EKS Cluster.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_cluster.py
+    :language: python
+    :start-after: [START howto_operator_eks_describe_cluster]
+    :end-before: [END howto_operator_eks_describe_cluster]
+
+
+.. _howto/operator:EKSDeleteClusterOperator:
+
+Deleting Amazon EKS Clusters
+----------------------------
+
+Defining tasks
+""""""""""""""
+
+In the following code we delete a given Amazon EKS Cluster.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_cluster.py
+    :language: python
+    :start-after: [START howto_operator_eks_delete_cluster]
+    :end-before: [END howto_operator_eks_delete_cluster]
+
+
+.. _howto/operator:EKSCreateNodegroupOperator:
+
+Creating Amazon EKS Managed NodeGroups
+--------------------------------------
+
+Purpose
+"""""""
+
+This example dag ``example_eks_create_nodegroup.py`` uses 
``EKSCreateNodegroupOperator``
+to create an Amazon EKS Managed Nodegroup using an existing cluster, 
``EKSListNodegroupsOperator``
+and ``EKSDescribeNodegroupOperator`` to verify creation, then 
``EKSDeleteNodegroupOperator``
+to delete the nodegroup.
+
+Prerequisites
+"""""""""""""
+
+An AWS IAM role with the following permissions:
+
+  "ec2.amazon.aws.com" must be in the Trusted Relationships
+  "AmazonEC2ContainerRegistryReadOnly" IAM Policy must be attached
+  "AmazonEKSWorkerNodePolicy" IAM Policy must be attached
+
+Defining tasks
+""""""""""""""
+
+In the following code we create a new Amazon EKS Managed Nodegroup.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_nodegroup.py
+    :language: python
+    :start-after: [START howto_operator_eks_create_nodegroup]
+    :end-before: [END howto_operator_eks_create_nodegroup]
+
+
+.. _howto/operator:EKSListNodegroupsOperator:
+.. _howto/operator:EKSDescribeNodegroupOperator:
+
+Listing and Describing Amazon EKS Clusters
+-------------------------------------------
+
+Defining tasks
+""""""""""""""
+
+In the following code we retrieve details for a given Amazon EKS nodegroup.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_nodegroup.py
+    :language: python
+    :start-after: [START howto_operator_eks_describe_nodegroup]
+    :end-before: [END howto_operator_eks_describe_nodegroup]
+
+
+In the following code we list all Amazon EKS Nodegroups in a given EKS Cluster.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_nodegroup.py
+    :language: python
+    :start-after: [START howto_operator_eks_list_nodegroup]
+    :end-before: [END howto_operator_eks_list_nodegroup]
+
+
+.. _howto/operator:EKSDeleteNodegroupOperator:
+
+Deleting Amazon EKS Managed Nodegroups
+--------------------------------------
+
+Defining tasks
+""""""""""""""
+
+In the following code we delete an Amazon EKS nodegroup.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_eks_create_nodegroup.py
+    :language: python
+    :start-after: [START howto_operator_eks_delete_nodegroup]
+    :end-before: [END howto_operator_eks_delete_nodegroup]
+
+
+Creating Amazon EKS Clusters and Node Groups Together
+------------------------------------------------------
+
+Purpose
+"""""""
+
+This example dag ``example_eks_create_stack.py`` demonstrates using
+``EKSCreateClusterOperator`` to create an Amazon EKS cluster and underlying
+Amazon EKS node group in one command.  ``EKSDescribeClustersOperator`` and
+``EKSDescribeNodegroupsOperator`` verify creation, then 
``EKSDeleteClusterOperator``
+deletes all created resources.
+
+Prerequisites
+"""""""""""""
+
+  "ec2.amazon.aws.com" must be in the Trusted Relationships
+  "eks.amazonaws.com" must be added to the Trusted Relationships
+  "AmazonEC2ContainerRegistryReadOnly" IAM Policy must be attached
+  "AmazonEKSClusterPolicy" IAM Policy must be attached
+  "AmazonEKSWorkerNodePolicy" IAM Policy must be attached

Review comment:
       Fixed in 616d249

##########
File path: docs/apache-airflow-providers-amazon/operators/eks.rst
##########
@@ -0,0 +1,265 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Amazon Elastic Kubernetes Service (EKS) Operators
+=================================================
+
+`Amazon Elastic Kubernetes Service (Amazon EKS) 
<https://aws.amazon.com/eks/>`__  is a managed service
+that makes it easy for you to run Kubernetes on AWS without needing to stand 
up or maintain your own
+Kubernetes control plane. Kubernetes is an open-source system for automating 
the deployment, scaling,
+and management of containerized applications.
+
+.. contents::
+  :depth: 1
+  :local:
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+Overview
+--------
+
+Airflow to Amazon Elastic Kubernetes Service (EKS) integration provides 
Operators to create and
+interact with the EKS clusters and compute infrastructure.
+
+ - :class:`~airflow.providers.amazon.aws.operators.eks`
+
+4 example_dags are provided which showcase these operators in action.
+
+ - example_eks_create_cluster.py
+ - example_eks_create_cluster_with_nodegroup.py
+ - example_eks_create_nodegroup.py
+ - example_eks_pod_operator.py

Review comment:
       Fixed in 616d249

##########
File path: airflow/providers/amazon/aws/operators/eks.py
##########
@@ -0,0 +1,737 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""This module contains Amazon EKS operators."""
+import json
+import os
+from datetime import datetime
+from time import sleep
+from typing import Dict, List, Optional
+
+from boto3 import Session
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.eks import DEFAULT_PAGINATION_TOKEN, 
DEFAULT_RESULTS_PER_PAGE, EKSHook
+from airflow.providers.amazon.aws.utils.eks_kube_config import (
+    DEFAULT_CONTEXT_NAME,
+    DEFAULT_KUBE_CONFIG_PATH,
+    DEFAULT_NAMESPACE_NAME,
+    DEFAULT_POD_USERNAME,
+    generate_config_file,
+)
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
+
+CHECK_INTERVAL_SECONDS = 15
+TIMEOUT_SECONDS = 25 * 60
+CONN_ID = "eks"
+REGION = Session().region_name
+DEFAULT_COMPUTE_TYPE = 'nodegroup'
+DEFAULT_NODEGROUP_NAME_SUFFIX = '-nodegroup'
+DEFAULT_POD_NAME = 'pod'
+KUBE_CONFIG_ENV_VAR = 'KUBECONFIG'
+
+
+class EKSCreateClusterOperator(BaseOperator):
+    """
+    Creates an Amazon EKS Cluster control plane.
+
+    Optionally, can also create the supporting compute architecture:
+    If argument 'compute' is provided with a value of 'nodegroup', will also 
attempt to create an Amazon
+    EKS Managed Nodegroup for the cluster.  See EKSCreateNodegroupOperator 
documentation for requirements.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateClusterOperator`
+
+    :param cluster_name: The unique name to give to your Amazon EKS Cluster.
+    :type cluster_name: str
+    :param cluster_role_arn: The Amazon Resource Name (ARN) of the IAM role 
that provides permissions for the
+       Kubernetes control plane to make calls to AWS API operations on your 
behalf.
+    :type cluster_role_arn: str
+    :param resources_vpc_config: The VPC configuration used by the cluster 
control plane.
+    :type resources_vpc_config: Dict
+    :param compute: The type of compute architecture to generate along with 
the cluster.
+        Defaults to 'nodegroup' to generate an EKS Managed Nodegroup.
+    :type compute: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    If 'compute' is 'nodegroup', the following are required:
+
+    :param nodegroup_name: The unique name to give your EKS Managed Nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_role_arn: The Amazon Resource Name (ARN) of the IAM role 
to associate
+         with the EKS Managed Nodegroup.
+    :type nodegroup_role_arn: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        cluster_role_arn: str,
+        resources_vpc_config: Dict,
+        nodegroup_name: Optional[str] = None,
+        nodegroup_role_arn: Optional[str] = None,
+        compute: Optional[str] = DEFAULT_COMPUTE_TYPE,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.clusterRoleArn = cluster_role_arn
+        self.resourcesVpcConfig = resources_vpc_config
+        self.compute = compute
+        self.conn_id = conn_id
+        self.region = region
+
+        if self.compute == 'nodegroup':
+            self.nodegroupName = nodegroup_name or self.clusterName + 
DEFAULT_NODEGROUP_NAME_SUFFIX
+            if nodegroup_role_arn:
+                self.nodegroupRoleArn = nodegroup_role_arn
+            else:
+                message = "Creating an EKS Managed Nodegroup requires 
nodegroup_role_arn to be passed in."
+                self.log.error(message)
+                raise AttributeError(message)
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        eks_hook.create_cluster(
+            name=self.clusterName,
+            roleArn=self.clusterRoleArn,
+            resourcesVpcConfig=self.resourcesVpcConfig,
+        )
+
+        if self.compute is not None:
+            self.log.info("Waiting for EKS Cluster to provision.  This will 
take some time.")
+
+            countdown = TIMEOUT_SECONDS
+            while eks_hook.get_cluster_state(clusterName=self.clusterName) != 
"ACTIVE":
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    self.log.info(
+                        "Waiting for cluster to start.  Checking again in %d 
seconds", CHECK_INTERVAL_SECONDS
+                    )
+                    sleep(CHECK_INTERVAL_SECONDS)
+                else:
+                    message = "Cluster is still inactive after the allocated 
time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        if self.compute == 'nodegroup':
+            eks_hook.create_nodegroup(
+                clusterName=self.clusterName,
+                nodegroupName=self.nodegroupName,
+                subnets=self.resourcesVpcConfig.get('subnetIds'),
+                nodeRole=self.nodegroupRoleArn,
+            )
+
+
+class EKSCreateNodegroupOperator(BaseOperator):
+    """
+    Creates am Amazon EKS Managed Nodegroup for an existing Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to create the 
managed nodegroup in.
+    :type cluster_name: str
+    :param nodegroup_name: The unique name to give your managed nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_subnets:
+        The subnets to use for the Auto Scaling group that is created for the 
managed nodegroup.
+    :type nodegroup_subnets: List[str]
+    :param nodegroup_role_arn:
+        The Amazon Resource Name (ARN) of the IAM role to associate with the 
managed nodegroup.
+    :type nodegroup_role_arn: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_subnets: List[str],
+        nodegroup_role_arn: str,
+        nodegroup_name: Optional[str],
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupSubnets = nodegroup_subnets
+        self.nodegroupRoleArn = nodegroup_role_arn
+        self.nodegroupName = nodegroup_name or cluster_name + 
datetime.now().strftime("%Y%m%d_%H%M%S")
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.create_nodegroup(
+            clusterName=self.clusterName,
+            nodegroupName=self.nodegroupName,
+            subnets=self.nodegroupSubnets,
+            nodeRole=self.nodegroupRoleArn,
+        )
+
+
+class EKSDeleteClusterOperator(BaseOperator):
+    """
+    Deletes the Amazon EKS Cluster control plane and all nodegroups attached 
to it.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteClusterOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to delete.
+    :type cluster_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self, cluster_name: str, conn_id: Optional[str] = CONN_ID, region: 
Optional[str] = REGION, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        nodegroups = 
eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')
+        nodegroup_count = len(nodegroups)
+        if nodegroup_count > 0:
+            self.log.info(
+                "A cluster can not be deleted with attached nodegroups.  
Deleting %d nodegroups.",
+                nodegroup_count,
+            )
+            for group in nodegroups:
+                eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=group)
+
+            # Scaling up the timeout based on the number of nodegroups that 
are being processed.
+            additional_seconds = 5 * 60
+            countdown = TIMEOUT_SECONDS + (nodegroup_count * 
additional_seconds)
+            while 
len(eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')) > 
0:
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    sleep(CHECK_INTERVAL_SECONDS)
+                    self.log.info(
+                        "Waiting for the remaining %s nodegroups to delete.  
Checking again in %d seconds.",
+                        nodegroup_count,
+                        CHECK_INTERVAL_SECONDS,
+                    )
+                else:
+                    message = "Nodegroups are still inactive after the 
allocated time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        self.log.info("No nodegroups remain, deleting cluster.")
+        return eks_hook.delete_cluster(name=self.clusterName)
+
+
+class EKSDeleteNodegroupOperator(BaseOperator):
+    """
+    Deletes an Amazon EKS Nodegroup from an Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster that is associated 
with your nodegroup.
+    :type cluster_name: str
+    :param nodegroup_name: The name of the nodegroup to delete.
+    :type nodegroup_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_name: str,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupName = nodegroup_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=self.nodegroupName)
+
+
+class EKSDescribeAllClustersOperator(BaseOperator):
+    """
+    Describes all Amazon EKS Clusters in your AWS account.
+
+    :param max_results: The maximum number of results to return.
+    :type max_results: int
+    :param next_token: The nextToken value returned from a previous paginated 
execution.

Review comment:
       I see.  I implemented the hooks to follow the API as closely as 
possible.   You think it would be better for the hook to just return everything 
and not allow pagination at all?

##########
File path: airflow/providers/amazon/aws/utils/eks_kube_config.py
##########
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from shutil import which
+from typing import Optional
+
+import boto3
+import yaml
+
+HOME = os.environ.get('HOME', '/tmp')
+DEFAULT_KUBE_CONFIG_FILENAME = 'config'
+DEFAULT_KUBE_CONFIG_PATH = str(os.path.join(HOME, '/.kube/', 
DEFAULT_KUBE_CONFIG_FILENAME))
+DEFAULT_CONTEXT_NAME = 'aws'
+DEFAULT_NAMESPACE_NAME = 'default'
+DEFAULT_POD_USERNAME = 'aws'
+
+
+def generate_config_file(
+    eks_cluster_name: str,
+    eks_namespace_name: str,
+    aws_profile: Optional[str],
+    kube_config_file_location: Optional[str] = DEFAULT_KUBE_CONFIG_PATH,
+    pod_username: Optional[str] = DEFAULT_POD_USERNAME,
+    pod_context: Optional[str] = DEFAULT_CONTEXT_NAME,
+    role_arn: Optional[str] = None,
+    aws_region: Optional[str] = None,
+) -> None:
+    """
+    Writes the kubeconfig file given an EKS Cluster name, AWS region, and file 
path.
+
+    :param eks_cluster_name: The name of the cluster to create the EKS Managed 
Nodegroup in.
+    :type eks_cluster_name: str
+    :param eks_namespace_name: The namespace to run within kubernetes.
+    :type eks_namespace_name: str
+    :param aws_profile: The named profile containing the credentials for the 
AWS CLI tool to use.
+    :type aws_profile: str
+    :param kube_config_file_location: Path to save the generated kube_config 
file to.
+    :type kube_config_file_location: str
+    :param pod_username: The username under which to execute the pod.
+    :type pod_username: str
+    :param pod_context: The name of the context access parameters to use.
+    :type pod_context: str
+    :param role_arn: The Amazon Resource Name (ARN) of the IAM role to 
associate with your nodegroup.
+    :type role_arn: str
+    :param aws_region: The name of the AWS Region the EKS Cluster resides in.
+    :type aws_region: str
+    """
+    installed = which("aws")
+    if installed is None:
+        message = (
+            "AWS CLI version 2 must be installed on the worker.  See: "
+            
"https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html";
+        )
+        print(message)
+        raise UnmetDependency(message)
+
+    # Set up the client
+    session = boto3.Session(region_name=aws_region, profile_name=aws_profile)
+    eks_client = session.client("eks")
+
+    # get cluster details
+    cluster = eks_client.describe_cluster(name=eks_cluster_name)
+    cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+    cluster_ep = cluster["cluster"]["endpoint"]
+
+    # build the cluster config hash
+    cli_args = [
+        "--region",
+        aws_region,
+        "eks",
+        "get-token",
+        "--cluster-name",
+        eks_cluster_name,
+    ]
+    if role_arn:
+        cli_args.extend(["--role-arn", role_arn])
+
+    cluster_config = {
+        "apiVersion": "v1",
+        "kind": "Config",
+        "clusters": [
+            {
+                "cluster": {"server": cluster_ep, 
"certificate-authority-data": cluster_cert},
+                "name": eks_cluster_name,
+            }
+        ],
+        "contexts": [
+            {
+                "context": {
+                    "cluster": eks_cluster_name,
+                    "namespace": eks_namespace_name,
+                    "user": pod_username,
+                },
+                "name": pod_context,
+            }
+        ],
+        "current-context": pod_context,
+        "preferences": {},
+        "users": [
+            {
+                "name": pod_username,
+                "user": {
+                    "exec": {
+                        "apiVersion": "client.authentication.k8s.io/v1alpha1",
+                        "args": cli_args,
+                        "command": "aws",
+                    }
+                },
+            }
+        ],
+    }
+
+    config_text = yaml.dump(cluster_config, default_flow_style=False)
+    open(kube_config_file_location, "w").write(config_text)

Review comment:
       Fixed in 616d249

##########
File path: airflow/providers/amazon/aws/utils/eks_kube_config.py
##########
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from shutil import which
+from typing import Optional
+
+import boto3
+import yaml
+
+HOME = os.environ.get('HOME', '/tmp')
+DEFAULT_KUBE_CONFIG_FILENAME = 'config'
+DEFAULT_KUBE_CONFIG_PATH = str(os.path.join(HOME, '/.kube/', 
DEFAULT_KUBE_CONFIG_FILENAME))
+DEFAULT_CONTEXT_NAME = 'aws'
+DEFAULT_NAMESPACE_NAME = 'default'
+DEFAULT_POD_USERNAME = 'aws'
+
+
+def generate_config_file(
+    eks_cluster_name: str,
+    eks_namespace_name: str,
+    aws_profile: Optional[str],
+    kube_config_file_location: Optional[str] = DEFAULT_KUBE_CONFIG_PATH,
+    pod_username: Optional[str] = DEFAULT_POD_USERNAME,
+    pod_context: Optional[str] = DEFAULT_CONTEXT_NAME,
+    role_arn: Optional[str] = None,
+    aws_region: Optional[str] = None,
+) -> None:
+    """
+    Writes the kubeconfig file given an EKS Cluster name, AWS region, and file 
path.
+
+    :param eks_cluster_name: The name of the cluster to create the EKS Managed 
Nodegroup in.
+    :type eks_cluster_name: str
+    :param eks_namespace_name: The namespace to run within kubernetes.
+    :type eks_namespace_name: str
+    :param aws_profile: The named profile containing the credentials for the 
AWS CLI tool to use.
+    :type aws_profile: str
+    :param kube_config_file_location: Path to save the generated kube_config 
file to.
+    :type kube_config_file_location: str
+    :param pod_username: The username under which to execute the pod.
+    :type pod_username: str
+    :param pod_context: The name of the context access parameters to use.
+    :type pod_context: str
+    :param role_arn: The Amazon Resource Name (ARN) of the IAM role to 
associate with your nodegroup.
+    :type role_arn: str
+    :param aws_region: The name of the AWS Region the EKS Cluster resides in.
+    :type aws_region: str
+    """
+    installed = which("aws")
+    if installed is None:
+        message = (
+            "AWS CLI version 2 must be installed on the worker.  See: "
+            
"https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html";
+        )
+        print(message)
+        raise UnmetDependency(message)
+
+    # Set up the client
+    session = boto3.Session(region_name=aws_region, profile_name=aws_profile)
+    eks_client = session.client("eks")
+
+    # get cluster details
+    cluster = eks_client.describe_cluster(name=eks_cluster_name)
+    cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+    cluster_ep = cluster["cluster"]["endpoint"]
+
+    # build the cluster config hash
+    cli_args = [
+        "--region",
+        aws_region,
+        "eks",
+        "get-token",
+        "--cluster-name",
+        eks_cluster_name,
+    ]
+    if role_arn:
+        cli_args.extend(["--role-arn", role_arn])

Review comment:
       Is the gcloud tool that the GKE operator uses in that link preinstalled 
in the official Docker image?  If so, could the AWS tool be as well?  

##########
File path: airflow/providers/amazon/aws/operators/eks.py
##########
@@ -0,0 +1,737 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=invalid-name
+"""This module contains Amazon EKS operators."""
+import json
+import os
+from datetime import datetime
+from time import sleep
+from typing import Dict, List, Optional
+
+from boto3 import Session
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.eks import DEFAULT_PAGINATION_TOKEN, 
DEFAULT_RESULTS_PER_PAGE, EKSHook
+from airflow.providers.amazon.aws.utils.eks_kube_config import (
+    DEFAULT_CONTEXT_NAME,
+    DEFAULT_KUBE_CONFIG_PATH,
+    DEFAULT_NAMESPACE_NAME,
+    DEFAULT_POD_USERNAME,
+    generate_config_file,
+)
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
+
+CHECK_INTERVAL_SECONDS = 15
+TIMEOUT_SECONDS = 25 * 60
+CONN_ID = "eks"
+REGION = Session().region_name
+DEFAULT_COMPUTE_TYPE = 'nodegroup'
+DEFAULT_NODEGROUP_NAME_SUFFIX = '-nodegroup'
+DEFAULT_POD_NAME = 'pod'
+KUBE_CONFIG_ENV_VAR = 'KUBECONFIG'
+
+
+class EKSCreateClusterOperator(BaseOperator):
+    """
+    Creates an Amazon EKS Cluster control plane.
+
+    Optionally, can also create the supporting compute architecture:
+    If argument 'compute' is provided with a value of 'nodegroup', will also 
attempt to create an Amazon
+    EKS Managed Nodegroup for the cluster.  See EKSCreateNodegroupOperator 
documentation for requirements.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateClusterOperator`
+
+    :param cluster_name: The unique name to give to your Amazon EKS Cluster.
+    :type cluster_name: str
+    :param cluster_role_arn: The Amazon Resource Name (ARN) of the IAM role 
that provides permissions for the
+       Kubernetes control plane to make calls to AWS API operations on your 
behalf.
+    :type cluster_role_arn: str
+    :param resources_vpc_config: The VPC configuration used by the cluster 
control plane.
+    :type resources_vpc_config: Dict
+    :param compute: The type of compute architecture to generate along with 
the cluster.
+        Defaults to 'nodegroup' to generate an EKS Managed Nodegroup.
+    :type compute: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    If 'compute' is 'nodegroup', the following are required:
+
+    :param nodegroup_name: The unique name to give your EKS Managed Nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_role_arn: The Amazon Resource Name (ARN) of the IAM role 
to associate
+         with the EKS Managed Nodegroup.
+    :type nodegroup_role_arn: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        cluster_role_arn: str,
+        resources_vpc_config: Dict,
+        nodegroup_name: Optional[str] = None,
+        nodegroup_role_arn: Optional[str] = None,
+        compute: Optional[str] = DEFAULT_COMPUTE_TYPE,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.clusterRoleArn = cluster_role_arn
+        self.resourcesVpcConfig = resources_vpc_config
+        self.compute = compute
+        self.conn_id = conn_id
+        self.region = region
+
+        if self.compute == 'nodegroup':
+            self.nodegroupName = nodegroup_name or self.clusterName + 
DEFAULT_NODEGROUP_NAME_SUFFIX
+            if nodegroup_role_arn:
+                self.nodegroupRoleArn = nodegroup_role_arn
+            else:
+                message = "Creating an EKS Managed Nodegroup requires 
nodegroup_role_arn to be passed in."
+                self.log.error(message)
+                raise AttributeError(message)
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        eks_hook.create_cluster(
+            name=self.clusterName,
+            roleArn=self.clusterRoleArn,
+            resourcesVpcConfig=self.resourcesVpcConfig,
+        )
+
+        if self.compute is not None:
+            self.log.info("Waiting for EKS Cluster to provision.  This will 
take some time.")
+
+            countdown = TIMEOUT_SECONDS
+            while eks_hook.get_cluster_state(clusterName=self.clusterName) != 
"ACTIVE":
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    self.log.info(
+                        "Waiting for cluster to start.  Checking again in %d 
seconds", CHECK_INTERVAL_SECONDS
+                    )
+                    sleep(CHECK_INTERVAL_SECONDS)
+                else:
+                    message = "Cluster is still inactive after the allocated 
time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        if self.compute == 'nodegroup':
+            eks_hook.create_nodegroup(
+                clusterName=self.clusterName,
+                nodegroupName=self.nodegroupName,
+                subnets=self.resourcesVpcConfig.get('subnetIds'),
+                nodeRole=self.nodegroupRoleArn,
+            )
+
+
+class EKSCreateNodegroupOperator(BaseOperator):
+    """
+    Creates am Amazon EKS Managed Nodegroup for an existing Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSCreateNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to create the 
managed nodegroup in.
+    :type cluster_name: str
+    :param nodegroup_name: The unique name to give your managed nodegroup.
+    :type nodegroup_name: str
+    :param nodegroup_subnets:
+        The subnets to use for the Auto Scaling group that is created for the 
managed nodegroup.
+    :type nodegroup_subnets: List[str]
+    :param nodegroup_role_arn:
+        The Amazon Resource Name (ARN) of the IAM role to associate with the 
managed nodegroup.
+    :type nodegroup_role_arn: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_subnets: List[str],
+        nodegroup_role_arn: str,
+        nodegroup_name: Optional[str],
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupSubnets = nodegroup_subnets
+        self.nodegroupRoleArn = nodegroup_role_arn
+        self.nodegroupName = nodegroup_name or cluster_name + 
datetime.now().strftime("%Y%m%d_%H%M%S")
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.create_nodegroup(
+            clusterName=self.clusterName,
+            nodegroupName=self.nodegroupName,
+            subnets=self.nodegroupSubnets,
+            nodeRole=self.nodegroupRoleArn,
+        )
+
+
+class EKSDeleteClusterOperator(BaseOperator):
+    """
+    Deletes the Amazon EKS Cluster control plane and all nodegroups attached 
to it.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteClusterOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster to delete.
+    :type cluster_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self, cluster_name: str, conn_id: Optional[str] = CONN_ID, region: 
Optional[str] = REGION, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        nodegroups = 
eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')
+        nodegroup_count = len(nodegroups)
+        if nodegroup_count > 0:
+            self.log.info(
+                "A cluster can not be deleted with attached nodegroups.  
Deleting %d nodegroups.",
+                nodegroup_count,
+            )
+            for group in nodegroups:
+                eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=group)
+
+            # Scaling up the timeout based on the number of nodegroups that 
are being processed.
+            additional_seconds = 5 * 60
+            countdown = TIMEOUT_SECONDS + (nodegroup_count * 
additional_seconds)
+            while 
len(eks_hook.list_nodegroups(clusterName=self.clusterName).get('nodegroups')) > 
0:
+                if countdown >= CHECK_INTERVAL_SECONDS:
+                    countdown -= CHECK_INTERVAL_SECONDS
+                    sleep(CHECK_INTERVAL_SECONDS)
+                    self.log.info(
+                        "Waiting for the remaining %s nodegroups to delete.  
Checking again in %d seconds.",
+                        nodegroup_count,
+                        CHECK_INTERVAL_SECONDS,
+                    )
+                else:
+                    message = "Nodegroups are still inactive after the 
allocated time limit.  Aborting."
+                    self.log.error(message)
+                    raise RuntimeError(message)
+
+        self.log.info("No nodegroups remain, deleting cluster.")
+        return eks_hook.delete_cluster(name=self.clusterName)
+
+
+class EKSDeleteNodegroupOperator(BaseOperator):
+    """
+    Deletes an Amazon EKS Nodegroup from an Amazon EKS Cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EKSDeleteNodegroupOperator`
+
+    :param cluster_name: The name of the Amazon EKS Cluster that is associated 
with your nodegroup.
+    :type cluster_name: str
+    :param nodegroup_name: The name of the nodegroup to delete.
+    :type nodegroup_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        nodegroup_name: str,
+        conn_id: Optional[str] = CONN_ID,
+        region: Optional[str] = REGION,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.clusterName = cluster_name
+        self.nodegroupName = nodegroup_name
+        self.conn_id = conn_id
+        self.region = region
+
+    def execute(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.conn_id,
+            region_name=self.region,
+        )
+
+        return eks_hook.delete_nodegroup(clusterName=self.clusterName, 
nodegroupName=self.nodegroupName)
+
+
+class EKSDescribeAllClustersOperator(BaseOperator):
+    """
+    Describes all Amazon EKS Clusters in your AWS account.
+
+    :param max_results: The maximum number of results to return.
+    :type max_results: int
+    :param next_token: The nextToken value returned from a previous paginated 
execution.

Review comment:
       I see.  I implemented the hooks to follow the API as closely as 
possible.   To make sure I am on the same page before I change anything, you 
think it would be better for the hook to just return everything and not allow 
pagination at all?

##########
File path: airflow/providers/amazon/aws/utils/eks_kube_config.py
##########
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from shutil import which
+from typing import Optional
+
+import boto3
+import yaml
+
+HOME = os.environ.get('HOME', '/tmp')
+DEFAULT_KUBE_CONFIG_FILENAME = 'config'
+DEFAULT_KUBE_CONFIG_PATH = str(os.path.join(HOME, '/.kube/', 
DEFAULT_KUBE_CONFIG_FILENAME))
+DEFAULT_CONTEXT_NAME = 'aws'
+DEFAULT_NAMESPACE_NAME = 'default'
+DEFAULT_POD_USERNAME = 'aws'
+
+
+def generate_config_file(
+    eks_cluster_name: str,
+    eks_namespace_name: str,
+    aws_profile: Optional[str],
+    kube_config_file_location: Optional[str] = DEFAULT_KUBE_CONFIG_PATH,
+    pod_username: Optional[str] = DEFAULT_POD_USERNAME,
+    pod_context: Optional[str] = DEFAULT_CONTEXT_NAME,
+    role_arn: Optional[str] = None,
+    aws_region: Optional[str] = None,
+) -> None:
+    """
+    Writes the kubeconfig file given an EKS Cluster name, AWS region, and file 
path.
+
+    :param eks_cluster_name: The name of the cluster to create the EKS Managed 
Nodegroup in.
+    :type eks_cluster_name: str
+    :param eks_namespace_name: The namespace to run within kubernetes.
+    :type eks_namespace_name: str
+    :param aws_profile: The named profile containing the credentials for the 
AWS CLI tool to use.
+    :type aws_profile: str
+    :param kube_config_file_location: Path to save the generated kube_config 
file to.
+    :type kube_config_file_location: str
+    :param pod_username: The username under which to execute the pod.
+    :type pod_username: str
+    :param pod_context: The name of the context access parameters to use.
+    :type pod_context: str
+    :param role_arn: The Amazon Resource Name (ARN) of the IAM role to 
associate with your nodegroup.
+    :type role_arn: str
+    :param aws_region: The name of the AWS Region the EKS Cluster resides in.
+    :type aws_region: str
+    """
+    installed = which("aws")
+    if installed is None:
+        message = (
+            "AWS CLI version 2 must be installed on the worker.  See: "
+            
"https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html";
+        )
+        print(message)
+        raise UnmetDependency(message)
+
+    # Set up the client
+    session = boto3.Session(region_name=aws_region, profile_name=aws_profile)
+    eks_client = session.client("eks")
+
+    # get cluster details
+    cluster = eks_client.describe_cluster(name=eks_cluster_name)
+    cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+    cluster_ep = cluster["cluster"]["endpoint"]
+
+    # build the cluster config hash
+    cli_args = [
+        "--region",
+        aws_region,
+        "eks",
+        "get-token",
+        "--cluster-name",
+        eks_cluster_name,
+    ]
+    if role_arn:
+        cli_args.extend(["--role-arn", role_arn])

Review comment:
       Then I am missing what makes it ok for GKE users to have to install the 
gcloud cli tool but not for EKS users to have to install the aws cli tool.

##########
File path: airflow/providers/amazon/aws/utils/eks_kube_config.py
##########
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from shutil import which
+from typing import Optional
+
+import boto3
+import yaml
+
+HOME = os.environ.get('HOME', '/tmp')
+DEFAULT_KUBE_CONFIG_FILENAME = 'config'
+DEFAULT_KUBE_CONFIG_PATH = str(os.path.join(HOME, '/.kube/', 
DEFAULT_KUBE_CONFIG_FILENAME))
+DEFAULT_CONTEXT_NAME = 'aws'
+DEFAULT_NAMESPACE_NAME = 'default'
+DEFAULT_POD_USERNAME = 'aws'
+
+
+def generate_config_file(
+    eks_cluster_name: str,
+    eks_namespace_name: str,
+    aws_profile: Optional[str],
+    kube_config_file_location: Optional[str] = DEFAULT_KUBE_CONFIG_PATH,
+    pod_username: Optional[str] = DEFAULT_POD_USERNAME,
+    pod_context: Optional[str] = DEFAULT_CONTEXT_NAME,
+    role_arn: Optional[str] = None,
+    aws_region: Optional[str] = None,
+) -> None:
+    """
+    Writes the kubeconfig file given an EKS Cluster name, AWS region, and file 
path.
+
+    :param eks_cluster_name: The name of the cluster to create the EKS Managed 
Nodegroup in.
+    :type eks_cluster_name: str
+    :param eks_namespace_name: The namespace to run within kubernetes.
+    :type eks_namespace_name: str
+    :param aws_profile: The named profile containing the credentials for the 
AWS CLI tool to use.
+    :type aws_profile: str
+    :param kube_config_file_location: Path to save the generated kube_config 
file to.
+    :type kube_config_file_location: str
+    :param pod_username: The username under which to execute the pod.
+    :type pod_username: str
+    :param pod_context: The name of the context access parameters to use.
+    :type pod_context: str
+    :param role_arn: The Amazon Resource Name (ARN) of the IAM role to 
associate with your nodegroup.
+    :type role_arn: str
+    :param aws_region: The name of the AWS Region the EKS Cluster resides in.
+    :type aws_region: str
+    """
+    installed = which("aws")
+    if installed is None:
+        message = (
+            "AWS CLI version 2 must be installed on the worker.  See: "
+            
"https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html";
+        )
+        print(message)
+        raise UnmetDependency(message)
+
+    # Set up the client
+    session = boto3.Session(region_name=aws_region, profile_name=aws_profile)
+    eks_client = session.client("eks")
+
+    # get cluster details
+    cluster = eks_client.describe_cluster(name=eks_cluster_name)
+    cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+    cluster_ep = cluster["cluster"]["endpoint"]
+
+    # build the cluster config hash
+    cli_args = [
+        "--region",
+        aws_region,
+        "eks",
+        "get-token",
+        "--cluster-name",
+        eks_cluster_name,
+    ]
+    if role_arn:
+        cli_args.extend(["--role-arn", role_arn])

Review comment:
       I still agree that it would be best to drop the dependency on the AWS 
CLI, I was just trying to understand the difference between the two cases... 
and that answers it. :)
   
   My main concern with most of the alternatives/workarounds that I found 
online was that if something changes in the way STS vends the tokens then we're 
stuck with a broken product and trying to catch up.  We can (safely???) assume 
that the AWS CLI would always have the correct current means to get the token.  
The AWS CLI code is open source, it's trivial to just copy/paste the relevant 
portions here, but that is asking for a problem, later IMHO.
   
   I'm going to try to implement your suggested `get_bearer_token` code and see 
where that takes me.  I appreciate your patience explaining the issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to