This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d8db36a1b1bee478af4b4ca3e8ac6471da557a96 Author: Jarek Potiuk <jarek.pot...@polidea.com> AuthorDate: Wed Nov 11 17:15:02 2020 +0100 Added k9s as integrated tool to help with kubernetes testing (#12163) The K9s is fantastic tool that helps to debug a running k8s instance. It is terminal-based windowed CLI that makes you several times more productive comparing to using kubectl commands. We've integrated k9s (it is run as a docker container and downloaded on demand). We've also separated out KUBECONFIG of the integrated kind cluster so that it does not mess with kubernetes configuration you might already have. Also - together with that the "surrounding" of the kubernetes tests were simplified and improved so that the k9s integration can be utilized well. Instead of kubectl port forwarding (which caused multitude of problems) we are now utilizing kind's portMapping feature + custom NodePort resource that maps port 8080 to 30007 NodePort which in turn maps it to 8080 port of the Webserver. This way we do not have to establish an external kubectl port forward which is prone to error and management - everything is brought up when Airflow gets deployed to the Kind Cluster and shuts down when the Kind cluster is stopped. Yet another problem fixed was killing of postgres by one of the kubernetes tests ('test_integration_run_dag_with_scheduler_failure'). Instead of just killing the scheduler it killed all pods - including the Postgres one (it was named 'airflow-postgres.*'). That caused various problems, as the database could be left in a strange state. I changed the tests to do what it claimed was doing - so killing only the scheduler during the test. This seemed to improve the stability of tests immensely in my local setup. (cherry picked from commit 21999dd56e9dbe9f7f9e25961954c5677c3c7c58) --- .github/workflows/ci.yml | 17 +- BREEZE.rst | 11 +- TESTING.rst | 151 ++++- breeze | 9 + breeze-complete | 5 +- chart/requirements.lock | 4 +- images/testing/k9s.png | Bin 0 -> 238713 bytes images/testing/kubeconfig-env.png | Bin 0 -> 231280 bytes images/testing/kubernetes-virtualenv.png | Bin 0 -> 110011 bytes images/testing/pytest-runner.png | Bin 0 -> 131589 bytes images/testing/run-test.png | Bin 0 -> 140728 bytes kubernetes_tests/test_kubernetes_executor.py | 7 +- kubernetes_tests/test_kubernetes_pod_operator.py | 672 ++++++++------------- scripts/ci/kubernetes/ci_run_kubernetes_tests.sh | 7 +- ...up_cluster_and_deploy_airflow_to_kubernetes.sh} | 3 +- scripts/ci/kubernetes/kind-cluster-conf.yaml | 5 + .../{kind-cluster-conf.yaml => nodeport.yaml} | 30 +- ...oy_app_to_kubernetes.sh => redeploy_airflow.sh} | 6 +- scripts/ci/libraries/_kind.sh | 126 ++-- 19 files changed, 516 insertions(+), 537 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6c854a1..81890a7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -636,23 +636,14 @@ jobs: python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }} - name: "Free space" run: ./scripts/ci/tools/ci_free_space_on_ci.sh - - name: "Setup Kind Cluster ${{ env.KIND_VERSION }}" - uses: engineerd/setup-kind@v0.4.0 - with: - version: "${{ env.KIND_VERSION }}" - name: airflow-python-${{matrix.python-version}}-${{matrix.kubernetes-version}} - config: "scripts/ci/kubernetes/kind-cluster-conf.yaml" - name: "Prepare PROD Image" run: ./scripts/ci/images/ci_prepare_prod_image_on_ci.sh - - name: "Deploy airflow to cluster" - id: deploy-app - run: ./scripts/ci/kubernetes/ci_deploy_app_to_kubernetes.sh + - name: "Setup cluster and deploy Airflow" + id: setp-cluster-deploy-app + run: ./scripts/ci/kubernetes/ci_setup_cluster_and_deploy_airflow_to_kubernetes.sh env: # We have the right image pulled already by the previous step SKIP_BUILDING_PROD_IMAGE: "true" - # due to some instabilities, in CI we try to increase port numbers when trying to establish - # port forwarding - INCREASE_PORT_NUMBER_FOR_KUBERNETES: "true" - name: "Cache virtualenv for kubernetes testing" uses: actions/cache@v2 env: @@ -669,7 +660,7 @@ jobs: key: "${{ env.cache-name }}-${{ github.job }}-${{ hashFiles('setup.py') }}\ -${{ needs.build-info.outputs.defaultKindVersion }}\ -${{ needs.build-info.outputs.defaultHelmVersion }}\ --$${{ matrix.kubernetes-version }}" +-${{ matrix.kubernetes-version }}" - name: "Kubernetes Tests" run: ./scripts/ci/kubernetes/ci_run_kubernetes_tests.sh - name: "Upload KinD logs" diff --git a/BREEZE.rst b/BREEZE.rst index c3c2d95..ce7dc6a 100644 --- a/BREEZE.rst +++ b/BREEZE.rst @@ -1188,6 +1188,7 @@ This is the current syntax for `./breeze <./breeze>`_: image building time in production image and at container entering time for CI image. One of: 1.10.12 1.10.11 1.10.10 1.10.9 1.10.8 1.10.7 1.10.6 1.10.5 1.10.4 1.10.3 1.10.2 + wheel -t, --install-airflow-reference INSTALL_AIRFLOW_REFERENCE If specified, installs Airflow directly from reference in GitHub. This happens at @@ -1712,7 +1713,14 @@ This is the current syntax for `./breeze <./breeze>`_: to the cluster so you can also pass appropriate build image flags that will influence rebuilding the production image. Operation is one of: - start stop restart status deploy test shell + start stop restart status deploy test shell k9s + + The last two operations - shell and k9s allow you to perform interactive testing with + kubernetes tests. You can enter the shell from which you can run kubernetes tests and in + another terminal you can start the k9s CLI to debug kubernetes instance. It is an easy + way to debug the kubernetes deployments. + + You can read more about k9s at https://k9scli.io/ Flags: @@ -2087,6 +2095,7 @@ This is the current syntax for `./breeze <./breeze>`_: image building time in production image and at container entering time for CI image. One of: 1.10.12 1.10.11 1.10.10 1.10.9 1.10.8 1.10.7 1.10.6 1.10.5 1.10.4 1.10.3 1.10.2 + wheel -t, --install-airflow-reference INSTALL_AIRFLOW_REFERENCE If specified, installs Airflow directly from reference in GitHub. This happens at diff --git a/TESTING.rst b/TESTING.rst index c8b170b..f6c6c10 100644 --- a/TESTING.rst +++ b/TESTING.rst @@ -418,6 +418,14 @@ can also decide to only run tests with ``-m quarantined`` flag to run only those Running Tests with Kubernetes ============================= +Airflow has tests that are run against real kubernetes cluster. We are using +`Kind <https://kind.sigs.k8s.io/>`_ to create and run the cluster. We integrated the tools to start/stop/ +deploy and run the cluster tests in our repository and into Breeze development environment. + +Configuration for the cluster is kept in ``./build/.kube/config`` file in your Airflow source repository and +our scripts set the ``KUBECONFIG`` variable to it. If you want to interact with the Kind cluster created +you can do it from outside of the scripts by exporting this variable and point it to this file. + Starting Kubernetes Cluster --------------------------- @@ -425,7 +433,7 @@ For your testing you manage Kind cluster with ``kind-cluster`` breeze command: .. code-block:: bash - ./breeze kind-cluster [ start | stop | recreate | status | deploy | test | shell ] + ./breeze kind-cluster [ start | stop | recreate | status | deploy | test | shell | k9s ] The command allows you to start/stop/recreate/status Kind Kubernetes cluster, deploy Airflow via Helm chart as well as interact with the cluster (via test and shell commands). @@ -444,11 +452,11 @@ Deploying Airflow to Kubernetes Cluster Deploying Airflow to the Kubernetes cluster created is also done via ``kind-cluster deploy`` breeze command: -.. code-block:: bash` +.. code-block:: bash ./breeze kind-cluster deploy -The deploy commands performs tthose steps: +The deploy commands performs those steps: 1. It rebuilds the latest ``apache/airflow:master-pythonX.Y`` production images using the latest sources using local cachine. It also adds example DAGs to the image, so that they do not @@ -465,20 +473,63 @@ Running tests with Kubernetes Cluster You can either run all tests or you can select which tests to run. You can also enter interactive virtualenv to run the tests manually one by one. -.. code-block:: bash +Running kubernetes tests via shell: - Running kubernetes tests +.. code-block:: bash ./scripts/ci/kubernetes/ci_run_kubernetes_tests.sh - runs all kubernetes tests ./scripts/ci/kubernetes/ci_run_kubernetes_tests.sh TEST [TEST ...] - runs selected kubernetes tests (from kubernetes_tests folder) + + +Running kubernetes tests via breeze: + +.. code-block:: bash + + ./breeze kind-cluster test + ./breeze kind-cluster test -- TEST TEST [TEST ...] + + +Entering shell with Kubernetes Cluster +-------------------------------------- + +This shell is prepared to run kubernetes tests interactively. It has ``kubectl`` and ``kind`` cli tools +available in the path, it has also activated virtualenv environment that allows you to run tests via pytest. + +You can enter the shell via those scripts + ./scripts/ci/kubernetes/ci_run_kubernetes_tests.sh [-i|--interactive] - Activates virtual environment ready to run tests and drops you in ./scripts/ci/kubernetes/ci_run_kubernetes_tests.sh [--help] - Prints this help message -You can also run the same tests command with Breeze, using ``kind-cluster test`` command (to run all -kubernetes tests) and with ``kind-cluster shell`` command you can enter interactive shell when you can -run tests. +.. code-block:: bash + + ./breeze kind-cluster shell + +K9s CLI - debug kubernetes in style! +------------------------------------ + +Breeze has built-in integration with fantastic k9s CLI tool, that allows you to debug the kubernetes +installation effortlessly and in style. K9S provides terminal (but windowed) CLI that allows you to +easily observe what's going on in the kubernetes instance, observe the resources defined (pods, secrets, +custom resource definitions), enter shell for the Pods/Containers running, see the log files and more. + +You can read more about k9s at `https://k9scli.io/ <https://k9scli.io/>`_ + +Here is the screenshot of k9s tools in operation: + +.. image:: images/testing/k9s.png + :align: center + :alt: K9S tool + + +You can enter the k9s tool via breeze (after you deployed Airflow): + +.. code-block:: bash + + ./breeze kind-cluster k9s + +You can exit k9s by pressing Ctrl-C. Typical testing pattern for Kubernetes tests -------------------------------------------- @@ -578,7 +629,6 @@ This prepares and enters the virtualenv in ``.build/.kubernetes_venv`` folder: ./breeze kind-cluster shell - Once you enter the environment you receive this information: @@ -595,12 +645,67 @@ Once you enter the environment you receive this information: You are entering the virtualenv now. Type exit to exit back to the original shell +In a separate terminal you can open the k9s CLI: + +.. code-block:: bash + + ./breeze kind-cluster k9s + +Use it to observe what's going on in your cluster. + +6. Debugging in IntelliJ/PyCharm + +It is very easy to running/debug Kubernetes tests with IntelliJ/PyCharm. Unlike the regular tests they are +in ``kubernetes_tests`` folder and if you followed the previous steps and entered the shell using +``./breeze kind-cluster shell`` command, you can setup your IDE very easily to run (and debug) your +tests using the standard IntelliJ Run/Debug feature. You just need a few steps: + +a) Add the virtualenv as interpreter for the project: + +.. image:: images/testing/kubernetes-virtualenv.png + :align: center + :alt: Kubernetes testing virtualenv + +The virtualenv is created in your "Airflow" source directory in ``.build/.kubernetes_venv/`` folder and you +have to find ``python`` binary and choose it when selecting interpreter. + +b) Choose pytest as test runner: + +.. image:: images/testing/pytest-runner.png + :align: center + :alt: Pytest runner + +c) Run/Debug tests using standard "Run/Debug" feature of IntelliJ + +.. image:: images/testing/run-tests.png + :align: center + :alt: Run/Debug tests + + +NOTE! The first time you run it, it will likely fail with +``kubernetes.config.config_exception.ConfigException``: +``Invalid kube-config file. Expected key current-context in kube-config``. You need to add KUBECONFIG +environment variabl copying it from the result of "./breeze kind-cluster test": + +.. code-block:: bash + + echo ${KUBECONFIG} + + /home/jarek/code/airflow/.build/.kube/config + + +.. image:: images/testing/kubeconfig-env.png + :align: center + :alt: Run/Debug tests + + +The configuration for kubernetes is stored in your "Airflow" source directory in ".build/.kube/config" file +and this is where KUBECONFIG env should point to. You can iterate with tests while you are in the virtualenv. All the tests requiring kubernetes cluster are in "kubernetes_tests" folder. You can add extra ``pytest`` parameters then (for example ``-s`` will print output generated test logs and print statements to the terminal immediately. - .. code-block:: bash pytest kubernetes_tests/test_kubernetes_executor.py::TestKubernetesExecutor::test_integration_run_dag_with_scheduler_failure -s @@ -609,6 +714,30 @@ print output generated test logs and print statements to the terminal immediatel You can modify the tests or KubernetesPodOperator and re-run them without re-deploying airflow to KinD cluster. + +Sometimes there are side effects from running tests. You can run ``redeploy_airflow.sh`` without +recreating the whole cluster. This will delete the whole namespace, including the database data +and start a new Airflow deployment in the cluster. + +.. code-block:: bash + + ./scripts/ci/redeploy_airflow.sh + +If needed you can also delete the cluster manually: + + +.. code-block:: bash + + kind get clusters + kind delete clusters <NAME_OF_THE_CLUSTER> + +Kind has also useful commands to inspect your running cluster: + +.. code-block:: text + + kind --help + + However, when you change Airflow Kubernetes executor implementation you need to redeploy Airflow to the cluster. @@ -617,7 +746,7 @@ Airflow to the cluster. ./breeze kind-cluster deploy -5. Stop KinD cluster when you are done +7. Stop KinD cluster when you are done .. code-block:: bash diff --git a/breeze b/breeze index 3498c64..55412d0 100755 --- a/breeze +++ b/breeze @@ -1684,6 +1684,13 @@ ${CMDNAME} kind-cluster [FLAGS] OPERATION ${FORMATTED_KIND_OPERATIONS} + The last two operations - shell and k9s allow you to perform interactive testing with + kubernetes tests. You can enter the shell from which you can run kubernetes tests and in + another terminal you can start the k9s CLI to debug kubernetes instance. It is an easy + way to debug the kubernetes deployments. + + You can read more about k9s at https://k9scli.io/ + Flags: $(breeze::flag_airflow_variants) $(breeze::flag_build_docker_images) @@ -2901,6 +2908,8 @@ function breeze::run_build_command() { echo "Run Kubernetes tests with the KinD cluster " elif [[ ${KIND_CLUSTER_OPERATION} == "shell" ]]; then echo "Enter an interactive shell for kubernetes testing" + elif [[ ${KIND_CLUSTER_OPERATION} == "k9s" ]]; then + echo "Run k9s cli to debug in style" elif [[ -z ${KIND_CLUSTER_OPERATION=} ]]; then echo echo "Please provide an operation to run" diff --git a/breeze-complete b/breeze-complete index 4855c86..94854ad 100644 --- a/breeze-complete +++ b/breeze-complete @@ -32,7 +32,7 @@ _breeze_allowed_helm_versions="v3.2.4" _breeze_allowed_kind_versions="v0.8.0" _breeze_allowed_mysql_versions="5.6 5.7" _breeze_allowed_postgres_versions="9.6 10 11 12 13" -_breeze_allowed_kind_operations="start stop restart status deploy test shell" +_breeze_allowed_kind_operations="start stop restart status deploy test shell k9s" _breeze_allowed_test_types="All Core Integration Heisentests Postgres MySQL Helm" # shellcheck disable=SC2034 @@ -60,6 +60,7 @@ _breeze_allowed_install_airflow_versions=$(cat <<-EOF 1.10.4 1.10.3 1.10.2 +wheel EOF ) @@ -134,7 +135,7 @@ _breeze_long_options=" help python: backend: integration: kubernetes-mode: kubernetes-version: helm-version: kind-version: skip-mounting-local-sources install-airflow-version: install-airflow-reference: db-reset -verbose assume-yes assume-no assume-quit forward-credentials rbac-ui init-script: +verbose assume-yes assume-no assume-quit forward-credentials init-script: force-build-images force-pull-images production-image extras: force-clean-images skip-rebuild-check build-cache-local build-cache-pulled build-cache-disabled disable-pip-cache dockerhub-user: dockerhub-repo: github-registry github-repository: github-image-id: diff --git a/chart/requirements.lock b/chart/requirements.lock index 3f3c34a..e460e9f 100644 --- a/chart/requirements.lock +++ b/chart/requirements.lock @@ -2,5 +2,5 @@ dependencies: - name: postgresql repository: https://charts.helm.sh/stable/ version: 6.3.12 -digest: sha256:58d88cf56e78b2380091e9e16cc6ccf58b88b3abe4a1886dd47cd9faef5309af -generated: "2020-11-04T15:59:36.967913-08:00" +digest: sha256:1748aa702050d4e72ffba1b18960f49bfe5368757cf976116afeffbdedda1c98 +generated: "2020-11-07T17:40:45.418723358+01:00" diff --git a/images/testing/k9s.png b/images/testing/k9s.png new file mode 100644 index 0000000..a8eec97 Binary files /dev/null and b/images/testing/k9s.png differ diff --git a/images/testing/kubeconfig-env.png b/images/testing/kubeconfig-env.png new file mode 100644 index 0000000..b2ebfd5 Binary files /dev/null and b/images/testing/kubeconfig-env.png differ diff --git a/images/testing/kubernetes-virtualenv.png b/images/testing/kubernetes-virtualenv.png new file mode 100644 index 0000000..6e208d6 Binary files /dev/null and b/images/testing/kubernetes-virtualenv.png differ diff --git a/images/testing/pytest-runner.png b/images/testing/pytest-runner.png new file mode 100644 index 0000000..fdb48cc Binary files /dev/null and b/images/testing/pytest-runner.png differ diff --git a/images/testing/run-test.png b/images/testing/run-test.png new file mode 100644 index 0000000..21a5c9d Binary files /dev/null and b/images/testing/run-test.png differ diff --git a/kubernetes_tests/test_kubernetes_executor.py b/kubernetes_tests/test_kubernetes_executor.py index 694cf75..bb89cb7 100644 --- a/kubernetes_tests/test_kubernetes_executor.py +++ b/kubernetes_tests/test_kubernetes_executor.py @@ -64,10 +64,11 @@ class TestKubernetesExecutor(unittest.TestCase): return len(names) @staticmethod - def _delete_airflow_pod(): + def _delete_airflow_pod(name=''): + suffix = '-' + name if name else '' air_pod = check_output(['kubectl', 'get', 'pods']).decode() air_pod = air_pod.split('\n') - names = [re.compile(r'\s+').split(x)[0] for x in air_pod if 'airflow' in x] + names = [re.compile(r'\s+').split(x)[0] for x in air_pod if 'airflow' + suffix in x] if names: check_call(['kubectl', 'delete', 'pod', names[0]]) @@ -233,7 +234,7 @@ class TestKubernetesExecutor(unittest.TestCase): execution_date = self.start_job_in_kubernetes(dag_id, host) - self._delete_airflow_pod() + self._delete_airflow_pod("scheduler") time.sleep(10) # give time for pod to restart diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 7a8674a..c2a0c62 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -15,43 +15,38 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - import json import logging import os +import random import shutil import sys -import unittest import textwrap +import unittest +from unittest import mock +from unittest.mock import ANY -import kubernetes.client.models as k8s import pendulum +from kubernetes.client import models as k8s from kubernetes.client.api_client import ApiClient from kubernetes.client.rest import ApiException -from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow.exceptions import AirflowException from airflow.kubernetes import kube_client -from airflow.kubernetes.pod import Port from airflow.kubernetes.pod_generator import PodDefaults from airflow.kubernetes.pod_launcher import PodLauncher -from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv from airflow.kubernetes.secret import Secret -from airflow.kubernetes.volume import Volume -from airflow.kubernetes.volume_mount import VolumeMount from airflow.models import DAG, TaskInstance +from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow.utils import timezone from airflow.version import version as airflow_version -from tests.compat import mock, patch -# noinspection DuplicatedCode def create_context(task): dag = DAG(dag_id="dag") tzinfo = pendulum.timezone("Europe/Amsterdam") execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo) - task_instance = TaskInstance(task=task, - execution_date=execution_date) + task_instance = TaskInstance(task=task, execution_date=execution_date) return { "dag": dag, "ts": execution_date.isoformat(), @@ -60,7 +55,11 @@ def create_context(task): } -# noinspection DuplicatedCode,PyUnusedLocal +def get_kubeconfig_path(): + kubeconfig_path = os.environ.get('KUBECONFIG') + return kubeconfig_path if kubeconfig_path else os.path.expanduser('~/.kube/config') + + class TestKubernetesPodOperatorSystem(unittest.TestCase): def get_current_task_name(self): # reverse test name to make pod name unique (it has limited length) @@ -74,29 +73,33 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): 'kind': 'Pod', 'metadata': { 'namespace': 'default', - 'name': mock.ANY, + 'name': ANY, 'annotations': {}, 'labels': { - 'foo': 'bar', 'kubernetes_pod_operator': 'True', + 'foo': 'bar', + 'kubernetes_pod_operator': 'True', 'airflow_version': airflow_version.replace('+', '-'), 'execution_date': '2016-01-01T0100000100-a2f50a31f', 'dag_id': 'dag', - 'task_id': 'task', - 'try_number': '1'}, + 'task_id': ANY, + 'try_number': '1', + }, }, 'spec': { 'affinity': {}, - 'containers': [{ - 'image': 'ubuntu:16.04', - 'args': ["echo 10"], - 'command': ["bash", "-cx"], - 'env': [], - 'imagePullPolicy': 'IfNotPresent', - 'envFrom': [], - 'name': 'base', - 'ports': [], - 'volumeMounts': [], - }], + 'containers': [ + { + 'image': 'ubuntu:16.04', + 'args': ["echo 10"], + 'command': ["bash", "-cx"], + 'env': [], + 'envFrom': [], + 'resources': {}, + 'name': 'base', + 'ports': [], + 'volumeMounts': [], + } + ], 'hostNetwork': False, 'imagePullSecrets': [], 'initContainers': [], @@ -106,29 +109,19 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): 'serviceAccountName': 'default', 'tolerations': [], 'volumes': [], - } + }, } def tearDown(self): client = kube_client.get_kube_client(in_cluster=False) client.delete_collection_namespaced_pod(namespace="default") + import time - def create_context(self, task): - dag = DAG(dag_id="dag") - tzinfo = pendulum.timezone("Europe/Amsterdam") - execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo) - task_instance = TaskInstance(task=task, - execution_date=execution_date) - return { - "dag": dag, - "ts": execution_date.isoformat(), - "task": task, - "ti": task_instance, - } + time.sleep(1) def test_do_xcom_push_defaults_false(self): new_config_path = '/tmp/kube_config' - old_config_path = os.path.expanduser('~/.kube/config') + old_config_path = get_kubeconfig_path() shutil.copy(old_config_path, new_config_path) k = KubernetesPodOperator( @@ -137,8 +130,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, config_file=new_config_path, @@ -147,7 +140,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): def test_config_path_move(self): new_config_path = '/tmp/kube_config' - old_config_path = os.path.expanduser('~/.kube/config') + old_config_path = get_kubeconfig_path() shutil.copy(old_config_path, new_config_path) k = KubernetesPodOperator( @@ -157,103 +150,16 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): arguments=["echo 10"], labels={"foo": "bar"}, name="test1", - task_id="task", + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, config_file=new_config_path, ) - context = self.create_context(k) + context = create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.assertEqual(self.expected_pod, actual_pod) - @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") - @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") - @mock.patch("airflow.kubernetes.kube_client.get_kube_client") - def test_config_path(self, client_mock, monitor_mock, start_mock): # pylint: disable=unused-argument - from airflow.utils.state import State - - file_path = "/tmp/fake_file" - k = KubernetesPodOperator( - namespace='default', - image="ubuntu:16.04", - cmds=["bash", "-cx"], - arguments=["echo 10"], - labels={"foo": "bar"}, - name="test", - task_id="task", - in_cluster=False, - do_xcom_push=False, - config_file=file_path, - cluster_context='default', - ) - monitor_mock.return_value = (State.SUCCESS, None) - client_mock.list_namespaced_pod.return_value = [] - context = self.create_context(k) - k.execute(context=context) - client_mock.assert_called_once_with( - in_cluster=False, - cluster_context='default', - config_file=file_path, - ) - - @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") - @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") - @mock.patch("airflow.kubernetes.kube_client.get_kube_client") - def test_image_pull_secrets_correctly_set(self, mock_client, monitor_mock, start_mock): - from airflow.utils.state import State - - fake_pull_secrets = "fakeSecret" - k = KubernetesPodOperator( - namespace='default', - image="ubuntu:16.04", - cmds=["bash", "-cx"], - arguments=["echo 10"], - labels={"foo": "bar"}, - name="test", - task_id="task", - in_cluster=False, - do_xcom_push=False, - image_pull_secrets=fake_pull_secrets, - cluster_context='default', - ) - monitor_mock.return_value = (State.SUCCESS, None) - context = self.create_context(k) - k.execute(context=context) - self.assertEqual( - start_mock.call_args[0][0].spec.image_pull_secrets, - [k8s.V1LocalObjectReference(name=fake_pull_secrets)] - ) - - @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") - @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") - @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.delete_pod") - @mock.patch("airflow.kubernetes.kube_client.get_kube_client") - def test_pod_delete_even_on_launcher_error( - self, - mock_client, - delete_pod_mock, - monitor_pod_mock, - start_pod_mock): # pylint: disable=unused-argument - k = KubernetesPodOperator( - namespace='default', - image="ubuntu:16.04", - cmds=["bash", "-cx"], - arguments=["echo 10"], - labels={"foo": "bar"}, - name="test", - task_id="task", - in_cluster=False, - do_xcom_push=False, - cluster_context='default', - is_delete_operator_pod=True, - ) - monitor_pod_mock.side_effect = AirflowException('fake failure') - with self.assertRaises(AirflowException): - context = self.create_context(k) - k.execute(context=context) - assert delete_pod_mock.called - def test_working_pod(self): k = KubernetesPodOperator( namespace='default', @@ -261,8 +167,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, ) @@ -279,49 +185,15 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, is_delete_operator_pod=True, ) - context = self.create_context(k) - k.execute(context) - actual_pod = self.api_client.sanitize_for_serialization(k.pod) - self.assertEqual(self.expected_pod['spec'], actual_pod['spec']) - self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels']) - - def test_pod_with_volume_secret(self): - k = KubernetesPodOperator( - namespace='default', - image="ubuntu:16.04", - cmds=["bash", "-cx"], - in_cluster=False, - labels={"foo": "bar"}, - arguments=["echo 10"], - secrets=[Secret( - deploy_type="volume", - deploy_target="/var/location", - secret="my-secret", - key="content.json", - )], - name="airflow-test-pod", - task_id="task", - get_logs=True, - is_delete_operator_pod=True, - ) - - context = self.create_context(k) + context = create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) - self.expected_pod['spec']['containers'][0]['volumeMounts'] = [ - {'mountPath': '/var/location', - 'name': mock.ANY, - 'readOnly': True}] - self.expected_pod['spec']['volumes'] = [ - {'name': mock.ANY, - 'secret': {'secretName': 'my-secret'}} - ] self.assertEqual(self.expected_pod['spec'], actual_pod['spec']) self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels']) @@ -332,13 +204,13 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, hostnetwork=True, ) - context = self.create_context(k) + context = create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.expected_pod['spec']['hostNetwork'] = True @@ -353,14 +225,14 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, hostnetwork=True, - dnspolicy=dns_policy + dnspolicy=dns_policy, ) - context = self.create_context(k) + context = create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.expected_pod['spec']['hostNetwork'] = True @@ -376,32 +248,28 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, - schedulername=scheduler_name + schedulername=scheduler_name, ) - context = self.create_context(k) + context = create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.expected_pod['spec']['schedulerName'] = scheduler_name self.assertEqual(self.expected_pod, actual_pod) - self.assertEqual(self.expected_pod['spec'], actual_pod['spec']) - self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels']) def test_pod_node_selectors(self): - node_selectors = { - 'beta.kubernetes.io/os': 'linux' - } + node_selectors = {'beta.kubernetes.io/os': 'linux'} k = KubernetesPodOperator( namespace='default', image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, node_selectors=node_selectors, @@ -413,40 +281,28 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): self.assertEqual(self.expected_pod, actual_pod) def test_pod_resources(self): - resources = { - 'limit_cpu': 0.25, - 'limit_memory': '64Mi', - 'limit_ephemeral_storage': '2Gi', - 'request_cpu': '250m', - 'request_memory': '64Mi', - 'request_ephemeral_storage': '1Gi', - } + resources = k8s.V1ResourceRequirements( + requests={'memory': '64Mi', 'cpu': '250m', 'ephemeral-storage': '1Gi'}, + limits={'memory': '64Mi', 'cpu': 0.25, 'nvidia.com/gpu': None, 'ephemeral-storage': '2Gi'}, + ) k = KubernetesPodOperator( namespace='default', image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, resources=resources, ) - context = self.create_context(k) + context = create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.expected_pod['spec']['containers'][0]['resources'] = { - 'requests': { - 'memory': '64Mi', - 'cpu': '250m', - 'ephemeral-storage': '1Gi' - }, - 'limits': { - 'memory': '64Mi', - 'cpu': 0.25, - 'ephemeral-storage': '2Gi' - } + 'requests': {'memory': '64Mi', 'cpu': '250m', 'ephemeral-storage': '1Gi'}, + 'limits': {'memory': '64Mi', 'cpu': 0.25, 'nvidia.com/gpu': None, 'ephemeral-storage': '2Gi'}, } self.assertEqual(self.expected_pod, actual_pod) @@ -457,11 +313,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): 'nodeSelectorTerms': [ { 'matchExpressions': [ - { - 'key': 'beta.kubernetes.io/os', - 'operator': 'In', - 'values': ['linux'] - } + {'key': 'beta.kubernetes.io/os', 'operator': 'In', 'values': ['linux']} ] } ] @@ -474,8 +326,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, affinity=affinity, @@ -487,7 +339,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): self.assertEqual(self.expected_pod, actual_pod) def test_port(self): - port = Port('http', 80) + port = k8s.V1ContainerPort( + name='http', + container_port=80, + ) k = KubernetesPodOperator( namespace='default', @@ -495,37 +350,33 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, ports=[port], ) - context = self.create_context(k) + context = create_context(k) k.execute(context=context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) - self.expected_pod['spec']['containers'][0]['ports'] = [{ - 'name': 'http', - 'containerPort': 80 - }] + self.expected_pod['spec']['containers'][0]['ports'] = [{'name': 'http', 'containerPort': 80}] self.assertEqual(self.expected_pod, actual_pod) def test_volume_mount(self): - with patch.object(PodLauncher, 'log') as mock_logger: - volume_mount = VolumeMount('test-volume', - mount_path='/tmp/test_volume', - sub_path=None, - read_only=False) - - volume_config = { - 'persistentVolumeClaim': - { - 'claimName': 'test-volume' - } - } - volume = Volume(name='test-volume', configs=volume_config) - args = ["echo \"retrieved from mount\" > /tmp/test_volume/test.txt " - "&& cat /tmp/test_volume/test.txt"] + with mock.patch.object(PodLauncher, 'log') as mock_logger: + volume_mount = k8s.V1VolumeMount( + name='test-volume', mount_path='/tmp/test_volume', sub_path=None, read_only=False + ) + + volume = k8s.V1Volume( + name='test-volume', + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'), + ) + + args = [ + "echo \"retrieved from mount\" > /tmp/test_volume/test.txt " + "&& cat /tmp/test_volume/test.txt" + ] k = KubernetesPodOperator( namespace='default', image="ubuntu:16.04", @@ -534,27 +385,22 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): labels={"foo": "bar"}, volume_mounts=[volume_mount], volumes=[volume], - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, ) context = create_context(k) k.execute(context=context) - mock_logger.info.assert_any_call(b"retrieved from mount\n") + mock_logger.info.assert_any_call('retrieved from mount') actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.expected_pod['spec']['containers'][0]['args'] = args - self.expected_pod['spec']['containers'][0]['volumeMounts'] = [{ - 'name': 'test-volume', - 'mountPath': '/tmp/test_volume', - 'readOnly': False - }] - self.expected_pod['spec']['volumes'] = [{ - 'name': 'test-volume', - 'persistentVolumeClaim': { - 'claimName': 'test-volume' - } - }] + self.expected_pod['spec']['containers'][0]['volumeMounts'] = [ + {'name': 'test-volume', 'mountPath': '/tmp/test_volume', 'readOnly': False} + ] + self.expected_pod['spec']['volumes'] = [ + {'name': 'test-volume', 'persistentVolumeClaim': {'claimName': 'test-volume'}} + ] self.assertEqual(self.expected_pod, actual_pod) def test_run_as_user_root(self): @@ -569,8 +415,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, security_context=security_context, @@ -594,8 +440,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, security_context=security_context, @@ -619,8 +465,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-fs-group", + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, security_context=security_context, @@ -639,8 +485,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, startup_timeout_seconds=5, @@ -660,8 +506,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, startup_timeout_seconds=5, @@ -685,8 +531,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=bad_internal_command, labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, ) @@ -699,15 +545,15 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): def test_xcom_push(self): return_value = '{"foo": "bar"\n, "buzz": 2}' - args = ['echo \'{}\' > /airflow/xcom/return.json'.format(return_value)] + args = ['echo \'' + str(return_value) + '\' > /airflow/xcom/return.json'] k = KubernetesPodOperator( namespace='default', image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=args, labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=True, ) @@ -730,7 +576,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): # GIVEN from airflow.utils.state import State - configmap = 'test-configmap' + configmap_name = "test-config-map" + env_from = [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap_name))] # WHEN k = KubernetesPodOperator( namespace='default', @@ -738,22 +585,17 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, - configmaps=[configmap], + env_from=env_from, ) # THEN mock_monitor.return_value = (State.SUCCESS, None) - context = self.create_context(k) + context = create_context(k) k.execute(context) - self.assertEqual( - mock_start.call_args[0][0].spec.containers[0].env_from, - [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource( - name=configmap - ))] - ) + self.assertEqual(mock_start.call_args[0][0].spec.containers[0].env_from, env_from) @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") @@ -761,6 +603,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): def test_envs_from_secrets(self, mock_client, monitor_mock, start_mock): # GIVEN from airflow.utils.state import State + secret_ref = 'secret_name' secrets = [Secret('env', None, secret_ref)] # WHEN @@ -771,34 +614,40 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): arguments=["echo 10"], secrets=secrets, labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, ) # THEN monitor_mock.return_value = (State.SUCCESS, None) - context = self.create_context(k) + context = create_context(k) k.execute(context) self.assertEqual( start_mock.call_args[0][0].spec.containers[0].env_from, - [k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource( - name=secret_ref - ))] + [k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name=secret_ref))], ) def test_env_vars(self): # WHEN + env_vars = [ + k8s.V1EnvVar(name="ENV1", value="val1"), + k8s.V1EnvVar(name="ENV2", value="val2"), + k8s.V1EnvVar( + name="ENV3", + value_from=k8s.V1EnvVarSource(field_ref=k8s.V1ObjectFieldSelector(field_path="status.podIP")), + ), + ] + k = KubernetesPodOperator( namespace='default', image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], - env_vars={"ENV1": "val1", "ENV2": "val2", }, - pod_runtime_info_envs=[PodRuntimeInfoEnv("ENV3", "status.podIP")], + env_vars=env_vars, labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, ) @@ -811,14 +660,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): self.expected_pod['spec']['containers'][0]['env'] = [ {'name': 'ENV1', 'value': 'val1'}, {'name': 'ENV2', 'value': 'val2'}, - { - 'name': 'ENV3', - 'valueFrom': { - 'fieldRef': { - 'fieldPath': 'status.podIP' - } - } - } + {'name': 'ENV3', 'valueFrom': {'fieldRef': {'fieldPath': 'status.podIP'}}}, ] self.assertEqual(self.expected_pod, actual_pod) @@ -828,7 +670,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): task_id="task" + self.get_current_task_name(), in_cluster=False, pod_template_file=fixture, - do_xcom_push=True + do_xcom_push=True, ) context = create_context(k) @@ -841,10 +683,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): k = KubernetesPodOperator( task_id="task" + self.get_current_task_name(), labels={"foo": "bar", "fizz": "buzz"}, - env_vars={"env_name": "value"}, + env_vars=[k8s.V1EnvVar(name="env_name", value="value")], in_cluster=False, pod_template_file=fixture, - do_xcom_push=True + do_xcom_push=True, ) context = create_context(k) @@ -856,20 +698,14 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): def test_init_container(self): # GIVEN - volume_mounts = [k8s.V1VolumeMount( - mount_path='/etc/foo', - name='test-volume', - sub_path=None, - read_only=True - )] - - init_environments = [k8s.V1EnvVar( - name='key1', - value='value1' - ), k8s.V1EnvVar( - name='key2', - value='value2' - )] + volume_mounts = [ + k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True) + ] + + init_environments = [ + k8s.V1EnvVar(name='key1', value='value1'), + k8s.V1EnvVar(name='key2', value='value2'), + ] init_container = k8s.V1Container( name="init-container", @@ -877,34 +713,20 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): env=init_environments, volume_mounts=volume_mounts, command=["bash", "-cx"], - args=["echo 10"] + args=["echo 10"], ) - volume_config = { - 'persistentVolumeClaim': - { - 'claimName': 'test-volume' - } - } - volume = Volume(name='test-volume', configs=volume_config) - + volume = k8s.V1Volume( + name='test-volume', + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'), + ) expected_init_container = { 'name': 'init-container', 'image': 'ubuntu:16.04', 'command': ['bash', '-cx'], 'args': ['echo 10'], - 'env': [{ - 'name': 'key1', - 'value': 'value1' - }, { - 'name': 'key2', - 'value': 'value2' - }], - 'volumeMounts': [{ - 'mountPath': '/etc/foo', - 'name': 'test-volume', - 'readOnly': True - }], + 'env': [{'name': 'key1', 'value': 'value1'}, {'name': 'key2', 'value': 'value2'}], + 'volumeMounts': [{'mountPath': '/etc/foo', 'name': 'test-volume', 'readOnly': True}], } k = KubernetesPodOperator( @@ -913,8 +735,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), volumes=[volume], init_containers=[init_container], in_cluster=False, @@ -924,30 +746,30 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.expected_pod['spec']['initContainers'] = [expected_init_container] - self.expected_pod['spec']['volumes'] = [{ - 'name': 'test-volume', - 'persistentVolumeClaim': { - 'claimName': 'test-volume' - } - }] + self.expected_pod['spec']['volumes'] = [ + {'name': 'test-volume', 'persistentVolumeClaim': {'claimName': 'test-volume'}} + ] self.assertEqual(self.expected_pod, actual_pod) @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") - def test_pod_template_file(self, mock_client, monitor_mock, start_mock): + def test_pod_template_file( + self, mock_client, monitor_mock, start_mock # pylint: disable=unused-argument + ): from airflow.utils.state import State - fixture = sys.path[0] + '/tests/kubernetes/pod.yaml' + + path = sys.path[0] + '/tests/kubernetes/pod.yaml' k = KubernetesPodOperator( - task_id='task', - pod_template_file=fixture, - do_xcom_push=True + task_id="task" + self.get_current_task_name(), pod_template_file=path, do_xcom_push=True ) + monitor_mock.return_value = (State.SUCCESS, None) context = create_context(k) with self.assertLogs(k.log, level=logging.DEBUG) as cm: k.execute(context) - expected_line = textwrap.dedent("""\ + expected_line = textwrap.dedent( + """\ DEBUG:airflow.task.operators:Starting pod: api_version: v1 kind: Pod @@ -956,65 +778,57 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cluster_name: null creation_timestamp: null deletion_grace_period_seconds: null\ - """).strip() + """ + ).strip() self.assertTrue(any(line.startswith(expected_line) for line in cm.output)) actual_pod = self.api_client.sanitize_for_serialization(k.pod) - expected_dict = {'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': {'annotations': {}, - 'labels': {}, - 'name': 'memory-demo', - 'namespace': 'mem-example'}, - 'spec': {'affinity': {}, - 'containers': [{'args': ['--vm', - '1', - '--vm-bytes', - '150M', - '--vm-hang', - '1'], - 'command': ['stress'], - 'env': [], - 'envFrom': [], - 'image': 'apache/airflow:stress-2020.07.10-1.0.4', - 'imagePullPolicy': 'IfNotPresent', - 'name': 'base', - 'ports': [], - 'resources': {'limits': {'memory': '200Mi'}, - 'requests': {'memory': '100Mi'}}, - 'volumeMounts': [{'mountPath': '/airflow/xcom', - 'name': 'xcom'}]}, - {'command': ['sh', - '-c', - 'trap "exit 0" INT; while true; do sleep ' - '30; done;'], - 'image': 'alpine', - 'name': 'airflow-xcom-sidecar', - 'resources': {'requests': {'cpu': '1m'}}, - 'volumeMounts': [{'mountPath': '/airflow/xcom', - 'name': 'xcom'}]}], - 'hostNetwork': False, - 'imagePullSecrets': [], - 'initContainers': [], - 'nodeSelector': {}, - 'restartPolicy': 'Never', - 'securityContext': {}, - 'serviceAccountName': 'default', - 'tolerations': [], - 'volumes': [{'emptyDir': {}, 'name': 'xcom'}]}} + expected_dict = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': {'annotations': {}, 'labels': {}, 'name': 'memory-demo', 'namespace': 'mem-example'}, + 'spec': { + 'affinity': {}, + 'containers': [ + { + 'args': ['--vm', '1', '--vm-bytes', '150M', '--vm-hang', '1'], + 'command': ['stress'], + 'env': [], + 'envFrom': [], + 'image': 'apache/airflow:stress-2020.07.10-1.0.4', + 'name': 'base', + 'ports': [], + 'resources': {'limits': {'memory': '200Mi'}, 'requests': {'memory': '100Mi'}}, + 'volumeMounts': [{'mountPath': '/airflow/xcom', 'name': 'xcom'}], + }, + { + 'command': ['sh', '-c', 'trap "exit 0" INT; while true; do sleep 30; done;'], + 'image': 'alpine', + 'name': 'airflow-xcom-sidecar', + 'resources': {'requests': {'cpu': '1m'}}, + 'volumeMounts': [{'mountPath': '/airflow/xcom', 'name': 'xcom'}], + }, + ], + 'hostNetwork': False, + 'imagePullSecrets': [], + 'initContainers': [], + 'nodeSelector': {}, + 'restartPolicy': 'Never', + 'securityContext': {}, + 'serviceAccountName': 'default', + 'tolerations': [], + 'volumes': [{'emptyDir': {}, 'name': 'xcom'}], + }, + } self.assertEqual(expected_dict, actual_pod) @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") def test_pod_priority_class_name( - self, - mock_client, - monitor_mock, - start_mock): # pylint: disable=unused-argument - """Test ability to assign priorityClassName to pod - - """ + self, mock_client, monitor_mock, start_mock # pylint: disable=unused-argument + ): + """Test ability to assign priorityClassName to pod""" from airflow.utils.state import State priority_class_name = "medium-test" @@ -1024,15 +838,15 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): cmds=["bash", "-cx"], arguments=["echo 10"], labels={"foo": "bar"}, - name="test", - task_id="task", + name="test-" + str(random.randint(0, 1000000)), + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, priority_class_name=priority_class_name, ) monitor_mock.return_value = (State.SUCCESS, None) - context = self.create_context(k) + context = create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.expected_pod['spec']['priorityClassName'] = priority_class_name @@ -1048,15 +862,15 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): arguments=["echo 10"], labels={"foo": "bar"}, name=pod_name_too_long, - task_id="task", + task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, ) @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") - def test_on_kill(self, - monitor_mock): # pylint: disable=unused-argument + def test_on_kill(self, monitor_mock): # pylint: disable=unused-argument from airflow.utils.state import State + client = kube_client.get_kube_client(in_cluster=False) name = "test" namespace = "default" @@ -1082,4 +896,46 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): with self.assertRaises(ApiException): pod = client.read_namespaced_pod(name=name, namespace=namespace) + def test_reattach_failing_pod_once(self): + from airflow.utils.state import State + + client = kube_client.get_kube_client(in_cluster=False) + name = "test" + namespace = "default" + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["exit 1"], + labels={"foo": "bar"}, + name="test", + task_id=name, + in_cluster=False, + do_xcom_push=False, + is_delete_operator_pod=False, + termination_grace_period=0, + ) + + context = create_context(k) + + with mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") as monitor_mock: + monitor_mock.return_value = (State.SUCCESS, None) + k.execute(context) + name = k.pod.metadata.name + pod = client.read_namespaced_pod(name=name, namespace=namespace) + while pod.status.phase != "Failed": + pod = client.read_namespaced_pod(name=name, namespace=namespace) + with self.assertRaises(AirflowException): + k.execute(context) + pod = client.read_namespaced_pod(name=name, namespace=namespace) + self.assertEqual(pod.metadata.labels["already_checked"], "True") + with mock.patch( + "airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator" + ".create_new_pod_for_operator" + ) as create_mock: + create_mock.return_value = ("success", {}, {}) + k.execute(context) + create_mock.assert_called_once() + + # pylint: enable=unused-argument diff --git a/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh b/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh index 4f13335..bcb5cf4 100755 --- a/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh +++ b/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh @@ -21,7 +21,6 @@ kind::make_sure_kubernetes_tools_are_installed kind::get_kind_cluster_name -traps::add_trap kind::stop_kubectl EXIT HUP INT TERM traps::add_trap kind::dump_kind_logs EXIT HUP INT TERM interactive="false" @@ -82,8 +81,13 @@ if [[ ! -d ${virtualenv_path} ]]; then python -m venv "${virtualenv_path}" fi +# In Python 3.5 activating virtualenv hits undefined variable +set +u + . "${virtualenv_path}/bin/activate" +set -u + pip install --upgrade pip==20.2.3 pip install pytest freezegun pytest-cov \ @@ -105,7 +109,6 @@ if [[ ${interactive} == "true" ]]; then echo echo "You are entering the virtualenv now. Type exit to exit back to the original shell" echo - kubectl config set-context --current --namespace=airflow exec "${SHELL}" else pytest "${pytest_args[@]}" "${tests_to_run[@]}" diff --git a/scripts/ci/kubernetes/ci_deploy_app_to_kubernetes.sh b/scripts/ci/kubernetes/ci_setup_cluster_and_deploy_airflow_to_kubernetes.sh similarity index 94% copy from scripts/ci/kubernetes/ci_deploy_app_to_kubernetes.sh copy to scripts/ci/kubernetes/ci_setup_cluster_and_deploy_airflow_to_kubernetes.sh index 2a7455a..e12f809 100755 --- a/scripts/ci/kubernetes/ci_deploy_app_to_kubernetes.sh +++ b/scripts/ci/kubernetes/ci_setup_cluster_and_deploy_airflow_to_kubernetes.sh @@ -23,10 +23,11 @@ traps::add_trap "kind::dump_kind_logs" EXIT HUP INT TERM kind::make_sure_kubernetes_tools_are_installed kind::get_kind_cluster_name +kind::perform_kind_cluster_operation "start" build_images::prepare_prod_build build_images::build_prod_images kind::build_image_for_kubernetes_tests kind::load_image_to_kind_cluster kind::deploy_airflow_with_helm -kind::forward_port_to_kind_webserver kind::deploy_test_kubernetes_resources +kind::wait_for_webserver_healthy diff --git a/scripts/ci/kubernetes/kind-cluster-conf.yaml b/scripts/ci/kubernetes/kind-cluster-conf.yaml index 348fb68..df60820 100644 --- a/scripts/ci/kubernetes/kind-cluster-conf.yaml +++ b/scripts/ci/kubernetes/kind-cluster-conf.yaml @@ -23,6 +23,11 @@ networking: nodes: - role: control-plane - role: worker + extraPortMappings: + - containerPort: 30007 + hostPort: 8080 + listenAddress: "0.0.0.0" + protocol: TCP kubeadmConfigPatchesJson6902: - group: kubeadm.k8s.io version: v1beta2 diff --git a/scripts/ci/kubernetes/kind-cluster-conf.yaml b/scripts/ci/kubernetes/nodeport.yaml similarity index 68% copy from scripts/ci/kubernetes/kind-cluster-conf.yaml copy to scripts/ci/kubernetes/nodeport.yaml index 348fb68..8438281 100644 --- a/scripts/ci/kubernetes/kind-cluster-conf.yaml +++ b/scripts/ci/kubernetes/nodeport.yaml @@ -15,19 +15,17 @@ # specific language governing permissions and limitations # under the License. --- -kind: Cluster -apiVersion: kind.sigs.k8s.io/v1alpha3 -networking: - apiServerAddress: 0.0.0.0 - apiServerPort: 19090 -nodes: - - role: control-plane - - role: worker -kubeadmConfigPatchesJson6902: - - group: kubeadm.k8s.io - version: v1beta2 - kind: ClusterConfiguration - patch: | - - op: add - path: /apiServer/certSANs/- - value: docker +apiVersion: v1 +kind: Service +metadata: + name: airflow-webserver-node-port +spec: + type: NodePort + selector: + component: webserver + release: airflow + tier: airflow + ports: + - port: 8080 + targetPort: 8080 + nodePort: 30007 diff --git a/scripts/ci/kubernetes/ci_deploy_app_to_kubernetes.sh b/scripts/ci/kubernetes/redeploy_airflow.sh similarity index 86% rename from scripts/ci/kubernetes/ci_deploy_app_to_kubernetes.sh rename to scripts/ci/kubernetes/redeploy_airflow.sh index 2a7455a..7803d7c 100755 --- a/scripts/ci/kubernetes/ci_deploy_app_to_kubernetes.sh +++ b/scripts/ci/kubernetes/redeploy_airflow.sh @@ -23,10 +23,6 @@ traps::add_trap "kind::dump_kind_logs" EXIT HUP INT TERM kind::make_sure_kubernetes_tools_are_installed kind::get_kind_cluster_name -build_images::prepare_prod_build -build_images::build_prod_images -kind::build_image_for_kubernetes_tests -kind::load_image_to_kind_cluster kind::deploy_airflow_with_helm -kind::forward_port_to_kind_webserver kind::deploy_test_kubernetes_resources +kind::wait_for_webserver_healthy diff --git a/scripts/ci/libraries/_kind.sh b/scripts/ci/libraries/_kind.sh index 6194742..defa4de 100644 --- a/scripts/ci/libraries/_kind.sh +++ b/scripts/ci/libraries/_kind.sh @@ -16,14 +16,16 @@ # specific language governing permissions and limitations # under the License. - -function kind::get_kind_cluster_name(){ +function kind::get_kind_cluster_name() { # Name of the KinD cluster to connect to export KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME:="airflow-python-${PYTHON_MAJOR_MINOR_VERSION}-${KUBERNETES_VERSION}"} readonly KIND_CLUSTER_NAME # Name of the KinD cluster to connect to when referred to via kubectl export KUBECTL_CLUSTER_NAME=kind-${KIND_CLUSTER_NAME} readonly KUBECTL_CLUSTER_NAME + export KUBECONFIG="${BUILD_CACHE_DIR}/.kube/config" + mkdir -pv "${BUILD_CACHE_DIR}/.kube/" + touch "${KUBECONFIG}" } function kind::dump_kind_logs() { @@ -40,7 +42,7 @@ function kind::dump_kind_logs() { } function kind::make_sure_kubernetes_tools_are_installed() { - SYSTEM=$(uname -s| tr '[:upper:]' '[:lower:]') + SYSTEM=$(uname -s | tr '[:upper:]' '[:lower:]') KIND_URL="https://github.com/kubernetes-sigs/kind/releases/download/${KIND_VERSION}/kind-${SYSTEM}-amd64" mkdir -pv "${BUILD_CACHE_DIR}/bin" @@ -48,7 +50,7 @@ function kind::make_sure_kubernetes_tools_are_installed() { DOWNLOADED_KIND_VERSION=v"$(${KIND_BINARY_PATH} --version | awk '{ print $3 }')" echo "Currently downloaded kind version = ${DOWNLOADED_KIND_VERSION}" fi - if [[ ! -f "${KIND_BINARY_PATH}" || ${DOWNLOADED_KIND_VERSION} != "${KIND_VERSION}" ]]; then + if [[ ! -f "${KIND_BINARY_PATH}" || ${DOWNLOADED_KIND_VERSION} != "${KIND_VERSION}" ]]; then echo echo "Downloading Kind version ${KIND_VERSION}" repeats::run_with_retry 4 \ @@ -94,24 +96,10 @@ function kind::make_sure_kubernetes_tools_are_installed() { } function kind::create_cluster() { - if [[ "${TRAVIS:="false"}" == "true" ]]; then - # Travis CI does not handle the nice output of Kind well, so we need to capture it - # And display only if kind fails to start - start_output_heartbeat "Creating kubernetes cluster" 10 - set +e - if ! OUTPUT=$(kind create cluster \ - --name "${KIND_CLUSTER_NAME}" \ - --config "${AIRFLOW_SOURCES}/scripts/ci/kubernetes/kind-cluster-conf.yaml" \ - --image "kindest/node:${KUBERNETES_VERSION}" 2>&1); then - echo "${OUTPUT}" - fi - stop_output_heartbeat - else - kind create cluster \ - --name "${KIND_CLUSTER_NAME}" \ - --config "${AIRFLOW_SOURCES}/scripts/ci/kubernetes/kind-cluster-conf.yaml" \ - --image "kindest/node:${KUBERNETES_VERSION}" - fi + kind create cluster \ + --name "${KIND_CLUSTER_NAME}" \ + --config "${AIRFLOW_SOURCES}/scripts/ci/kubernetes/kind-cluster-conf.yaml" \ + --image "kindest/node:${KUBERNETES_VERSION}" echo echo "Created cluster ${KIND_CLUSTER_NAME}" echo @@ -125,9 +113,12 @@ function kind::delete_cluster() { rm -rf "${HOME}/.kube/*" } -function kind::perform_kind_cluster_operation() { - ALLOWED_KIND_OPERATIONS="[ start restart stop deploy test shell recreate ]" +function kind::set_current_context() { + kubectl config set-context --current --namespace=airflow +} +function kind::perform_kind_cluster_operation() { + ALLOWED_KIND_OPERATIONS="[ start restart stop deploy test shell recreate k9s]" set +u if [[ -z "${1=}" ]]; then echo >&2 @@ -170,6 +161,7 @@ function kind::perform_kind_cluster_operation() { echo kind::delete_cluster kind::create_cluster + kind::set_current_context elif [[ ${OPERATION} == "stop" ]]; then echo echo "Deleting cluster" @@ -181,20 +173,35 @@ function kind::perform_kind_cluster_operation() { echo "Deploying Airflow to KinD" echo kind::build_image_for_kubernetes_tests + kind::get_kind_cluster_name kind::load_image_to_kind_cluster kind::deploy_airflow_with_helm - kind::forward_port_to_kind_webserver kind::deploy_test_kubernetes_resources + kind::wait_for_webserver_healthy elif [[ ${OPERATION} == "test" ]]; then echo echo "Testing with KinD" echo + kind::set_current_context "${AIRFLOW_SOURCES}/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh" elif [[ ${OPERATION} == "shell" ]]; then echo echo "Entering an interactive shell for kubernetes testing" echo + kind::set_current_context "${AIRFLOW_SOURCES}/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh" "-i" + elif [[ ${OPERATION} == "k9s" ]]; then + echo + echo "Starting k9s CLI" + echo + export TERM=xterm-256color + export EDITOR=vim + export K9S_EDITOR=vim + kind::set_current_context + exec docker run --rm -it --network host \ + -e COLUMNS="$(tput cols)" -e LINES="$(tput lines)" \ + -e EDITOR -e K9S_EDITOR \ + -v "${KUBECONFIG}:/root/.kube/config" quay.io/derailed/k9s else echo >&2 echo >&2 "Wrong cluster operation: ${OPERATION}. Should be one of: ${ALLOWED_KIND_OPERATIONS}" @@ -213,8 +220,7 @@ function kind::perform_kind_cluster_operation() { echo "Creating cluster" echo kind::create_cluster - elif [[ ${OPERATION} == "stop" || ${OPERATION} == "deploy" || \ - ${OPERATION} == "test" || ${OPERATION} == "shell" ]]; then + elif [[ ${OPERATION} == "stop" || ${OPERATION} == "deploy" || ${OPERATION} == "test" || ${OPERATION} == "shell" ]]; then echo >&2 echo >&2 "Cluster ${KIND_CLUSTER_NAME} does not exist. It should exist for ${OPERATION} operation" echo >&2 @@ -245,7 +251,6 @@ function kind::check_cluster_ready_for_airflow() { kubectl create namespace test-namespace --cluster "${KUBECTL_CLUSTER_NAME}" } - function kind::build_image_for_kubernetes_tests() { cd "${AIRFLOW_SOURCES}" || exit 1 docker build --tag "${AIRFLOW_PROD_IMAGE_KUBERNETES}" . -f - <<EOF @@ -268,61 +273,37 @@ function kind::load_image_to_kind_cluster() { kind load docker-image --name "${KIND_CLUSTER_NAME}" "${AIRFLOW_PROD_IMAGE_KUBERNETES}" } -MAX_NUM_TRIES_FOR_PORT_FORWARD=12 -readonly MAX_NUM_TRIES_FOR_PORT_FORWARD +MAX_NUM_TRIES_FOR_HEALTH_CHECK=12 +readonly MAX_NUM_TRIES_FOR_HEALTH_CHECK -SLEEP_TIME_FOR_PORT_FORWARD=10 -readonly SLEEP_TIME_FOR_PORT_FORWARD - -forwarded_port_number=8080 - -function kind::start_kubectl_forward() { - echo - echo "Trying to forward port ${forwarded_port_number} to 8080 on server" - echo - kubectl port-forward svc/airflow-webserver "${forwarded_port_number}:8080" --namespace airflow >/dev/null & -} +SLEEP_TIME_FOR_HEALTH_CHECK=10 +readonly SLEEP_TIME_FOR_HEALTH_CHECK -function kind::stop_kubectl() { - echo - echo "Stops all kubectl instances" - echo - killall kubectl || true - sleep 10 - killall -s KILL kubectl || true +FORWARDED_PORT_NUMBER=8080 +readonly FORWARDED_PORT_NUMBER -} -function kind::forward_port_to_kind_webserver() { +function kind::wait_for_webserver_healthy() { num_tries=0 set +e - kind::start_kubectl_forward - sleep "${SLEEP_TIME_FOR_PORT_FORWARD}" - while ! curl "http://localhost:${forwarded_port_number}/health" -s | grep -q healthy; do + sleep "${SLEEP_TIME_FOR_HEALTH_CHECK}" + while ! curl "http://localhost:${FORWARDED_PORT_NUMBER}/health" -s | grep -q healthy; do echo - echo "Trying to establish port forwarding to 'airflow webserver'" + echo "Sleeping ${SLEEP_TIME_FOR_HEALTH_CHECK} while waiting for webserver being ready" echo - if [[ ${INCREASE_PORT_NUMBER_FOR_KUBERNETES=} == "true" ]] ; then - forwarded_port_number=$(( forwarded_port_number + 1 )) + sleep "${SLEEP_TIME_FOR_HEALTH_CHECK}" + num_tries=$((num_tries + 1)) + if [[ ${num_tries} == "${MAX_NUM_TRIES_FOR_HEALTH_CHECK}" ]]; then + >&2 echo + >&2 echo "Timeout while waiting for the webserver health check" + >&2 echo fi - if [[ ${num_tries} == "${MAX_NUM_TRIES_FOR_PORT_FORWARD}" ]]; then - echo >&2 - echo >&2 "ERROR! Could not setup a forward port to Airflow's webserver after ${num_tries}! Exiting." - echo >&2 - exit 1 - fi - echo - echo "Trying to establish port forwarding to 'airflow webserver'" - echo - kind::start_kubectl_forward - sleep "${SLEEP_TIME_FOR_PORT_FORWARD}" - num_tries=$(( num_tries + 1)) done echo - echo "Connection to 'airflow webserver' established on port ${forwarded_port_number}" + echo "Connection to 'airflow webserver' established on port ${FORWARDED_PORT_NUMBER}" echo - initialization::ga_env CLUSTER_FORWARDED_PORT "${forwarded_port_number}" - export CLUSTER_FORWARDED_PORT="${forwarded_port_number}" + initialization::ga_env CLUSTER_FORWARDED_PORT "${FORWARDED_PORT_NUMBER}" + export CLUSTER_FORWARDED_PORT="${FORWARDED_PORT_NUMBER}" set -e } @@ -348,16 +329,15 @@ function kind::deploy_airflow_with_helm() { popd || exit 1 } - function kind::deploy_test_kubernetes_resources() { echo echo "Deploying Custom kubernetes resources" echo kubectl apply -f "scripts/ci/kubernetes/volumes.yaml" --namespace default kubectl apply -f "scripts/ci/kubernetes/secrets.yaml" --namespace default + kubectl apply -f "scripts/ci/kubernetes/nodeport.yaml" --namespace airflow } - function kind::dump_kubernetes_logs() { POD=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' \ --cluster "${KUBECTL_CLUSTER_NAME}" | grep airflow | head -1)