ferruzzi commented on a change in pull request #16571:
URL: https://github.com/apache/airflow/pull/16571#discussion_r656381843



##########
File path: tests/providers/amazon/aws/hooks/test_eks.py
##########
@@ -0,0 +1,872 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import json
+import unittest
+from copy import deepcopy
+from typing import Dict, List, Optional, Tuple, Type
+from unittest import mock
+from urllib.parse import ParseResult, urlparse
+
+import pytest
+from _pytest._code import ExceptionInfo
+from botocore.exceptions import ClientError
+from freezegun import freeze_time
+from moto.core import ACCOUNT_ID
+from moto.core.exceptions import AWSError
+from moto.eks.exceptions import (
+    InvalidParameterException,
+    InvalidRequestException,
+    ResourceInUseException,
+    ResourceNotFoundException,
+)
+from moto.eks.models import (
+    CLUSTER_EXISTS_MSG,
+    CLUSTER_IN_USE_MSG,
+    CLUSTER_NOT_FOUND_MSG,
+    CLUSTER_NOT_READY_MSG,
+    LAUNCH_TEMPLATE_WITH_DISK_SIZE_MSG,
+    LAUNCH_TEMPLATE_WITH_REMOTE_ACCESS_MSG,
+    NODEGROUP_EXISTS_MSG,
+    NODEGROUP_NOT_FOUND_MSG,
+)
+
+from airflow.providers.amazon.aws.hooks.eks import EKSHook
+
+from ..utils.test_eks_constants import (
+    CONN_ID,
+    DEFAULT_MAX_RESULTS,
+    DISK_SIZE,
+    FROZEN_TIME,
+    INSTANCE_TYPES,
+    LAUNCH_TEMPLATE,
+    PACKAGE_NOT_PRESENT_MSG,
+    PARTITIONS,
+    REGION,
+    REMOTE_ACCESS,
+    BatchCountSize,
+    ClusterAttributes,
+    ClusterInputs,
+    ErrorAttributes,
+    NodegroupAttributes,
+    NodegroupInputs,
+    PageCount,
+    PossibleTestResults,
+    RegExTemplates,
+    ResponseAttribute,
+)
+from ..utils.test_eks_utils import (
+    attributes_to_test,
+    generate_clusters,
+    generate_nodegroups,
+    random_names,
+    region_matches_partition,
+)
+
+try:
+    from moto import mock_eks
+except ImportError:
+    mock_eks = None
+
+
[email protected](scope="function")
+def cluster_builder():
+    """A fixture to generate a batch of EKS Clusters on the mocked backend for 
testing."""
+
+    class ClusterTestDataFactory:
+        """A Factory class for building the Cluster objects."""
+
+        def __init__(self, count: int, minimal: bool) -> None:
+            # Generate 'count' number of random Cluster objects.
+            self.cluster_names: List[str] = generate_clusters(
+                eks_hook=eks_hook, num_clusters=count, minimal=minimal
+            )
+
+            # Get the name of the first generated Cluster.
+            first_name: str = self.cluster_names[0]
+
+            # Collect the output of describe_cluster() for the first Cluster.
+            self.cluster_describe_output: str = 
json.loads(eks_hook.describe_cluster(name=first_name))[
+                ResponseAttribute.CLUSTER
+            ]
+
+            # Pick a random Cluster name from the list and a name guaranteed 
not to be on the list.
+            self.existing_cluster_name, self.nonexistent_cluster_name = 
random_names(
+                name_list=self.cluster_names
+            )
+
+            # Generate a list of the Cluster attributes to be tested when 
validating results.
+            self.attributes_to_test: List[Tuple] = attributes_to_test(
+                inputs=ClusterInputs, cluster_name=self.existing_cluster_name
+            )
+
+    def _execute(
+        count: Optional[int] = 1, minimal: Optional[bool] = True
+    ) -> Tuple[EKSHook, ClusterTestDataFactory]:
+        return eks_hook, ClusterTestDataFactory(count=count, minimal=minimal)
+
+    mock_eks().start()
+    eks_hook = EKSHook(
+        aws_conn_id=CONN_ID,
+        region_name=REGION,
+    )
+    yield _execute
+    mock_eks().stop()
+
+
[email protected](scope="function")
+def nodegroup_builder(cluster_builder):
+    """A fixture to generate a batch of EKSManaged Nodegroups on the mocked 
backend for testing."""
+
+    class NodegroupTestDataFactory:
+        """A Factory class for building the Cluster objects."""
+
+        def __init__(self, count: int, minimal: bool) -> None:
+            self.cluster_name: str = cluster.existing_cluster_name
+
+            # Generate 'count' number of random Nodegroup objects.
+            self.nodegroup_names: List[str] = generate_nodegroups(
+                eks_hook=eks_hook, cluster_name=self.cluster_name, 
num_nodegroups=count, minimal=minimal
+            )
+
+            # Get the name of the first generated Nodegroup.
+            self.first_name: str = self.nodegroup_names[0]
+
+            # Collect the output of describe_nodegroup() for the first 
Nodegroup.
+            self.nodegroup_describe_output: Dict = json.loads(
+                eks_hook.describe_nodegroup(clusterName=self.cluster_name, 
nodegroupName=self.first_name)
+            )[ResponseAttribute.NODEGROUP]
+
+            # Pick a random Nodegroup name from the list and a name guaranteed 
not to be on the list.
+            self.existing_nodegroup_name, self.nonexistent_nodegroup_name = 
random_names(
+                name_list=self.nodegroup_names
+            )
+            _, self.nonexistent_cluster_name = 
random_names(name_list=[self.cluster_name])
+
+            # Generate a list of the Nodegroup attributes to be tested when 
validating results.
+            self.attributes_to_test: List[Tuple] = attributes_to_test(
+                inputs=NodegroupInputs,
+                cluster_name=self.cluster_name,
+                nodegroup_name=self.existing_nodegroup_name,
+            )
+
+    def _execute(
+        count: Optional[int] = 1, minimal: Optional[bool] = True
+    ) -> Tuple[EKSHook, NodegroupTestDataFactory]:
+        return eks_hook, NodegroupTestDataFactory(count=count, minimal=minimal)
+
+    eks_hook, cluster = cluster_builder()
+    return _execute
+
+
[email protected](mock_eks is None, reason=PACKAGE_NOT_PRESENT_MSG)

Review comment:
       Thanks, I'll incorporate that in my coming revision.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to