This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 91bb877  Use the Stable REST API for Kubernetes executor integration 
tests (#15644)
91bb877 is described below

commit 91bb877ff4b0e0934cb081dd103898bd7386c21e
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue May 4 18:02:54 2021 +0100

    Use the Stable REST API for Kubernetes executor integration tests (#15644)
    
    Currently, we use the experimental REST API to run the Kubernetes executor 
integration tests.
    This PR changes this to use the stable REST API for these tests
---
 kubernetes_tests/test_kubernetes_executor.py | 51 +++++++++++++++-------------
 scripts/ci/libraries/_kind.sh                |  3 +-
 2 files changed, 29 insertions(+), 25 deletions(-)

diff --git a/kubernetes_tests/test_kubernetes_executor.py 
b/kubernetes_tests/test_kubernetes_executor.py
index 2933547..169c957 100644
--- a/kubernetes_tests/test_kubernetes_executor.py
+++ b/kubernetes_tests/test_kubernetes_executor.py
@@ -73,6 +73,7 @@ class TestKubernetesExecutor(unittest.TestCase):
 
     def _get_session_with_retries(self):
         session = requests.Session()
+        session.auth = ('admin', 'admin')
         retries = Retry(total=3, backoff_factor=1)
         session.mount('http://', HTTPAdapter(max_retries=retries))
         session.mount('https://', HTTPAdapter(max_retries=retries))
@@ -93,18 +94,17 @@ class TestKubernetesExecutor(unittest.TestCase):
     def tearDown(self):
         self.session.close()
 
-    def monitor_task(self, host, execution_date, dag_id, task_id, 
expected_final_state, timeout):
+    def monitor_task(self, host, dag_run_id, dag_id, task_id, 
expected_final_state, timeout):
         tries = 0
         state = ''
         max_tries = max(int(timeout / 5), 1)
         # Wait some time for the operator to complete
         while tries < max_tries:
             time.sleep(5)
-            # Trigger a new dagrun
+            # Check task state
             try:
                 get_string = (
-                    f'http://{host}/api/experimental/dags/{dag_id}/'
-                    f'dag_runs/{execution_date}/tasks/{task_id}'
+                    
f'http://{host}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}'
                 )
                 print(f"Calling [monitor_task]#1 {get_string}")
                 result = self.session.get(get_string)
@@ -136,14 +136,17 @@ class TestKubernetesExecutor(unittest.TestCase):
         # Wait some time for the operator to complete
         while tries < max_tries:
             time.sleep(5)
-            get_string = 
f'http://{host}/api/experimental/dags/{dag_id}/dag_runs/{execution_date}'
+            get_string = f'http://{host}/api/v1/dags/{dag_id}/dagRuns'
             print(f"Calling {get_string}")
-            # Trigger a new dagrun
+            # Get all dagruns
             result = self.session.get(get_string)
             assert result.status_code == 200, "Could not get the status"
             result_json = result.json()
             print(f"Received: {result}")
-            state = result_json['state']
+            state = None
+            for dag_run in result_json['dag_runs']:
+                if dag_run['execution_date'] == execution_date:
+                    state = dag_run['state']
             check_call(["echo", f"Attempt {tries}: Current state of dag is 
{state}"])
             print(f"Attempt {tries}: Current state of dag is {state}")
 
@@ -157,16 +160,16 @@ class TestKubernetesExecutor(unittest.TestCase):
         # Maybe check if we can retrieve the logs, but then we need to extend 
the API
 
     def start_dag(self, dag_id, host):
-        get_string = 
f'http://{host}/api/experimental/dags/{dag_id}/paused/false'
-        print(f"Calling [start_dag]#1 {get_string}")
-        result = self.session.get(get_string)
+        patch_string = f'http://{host}/api/v1/dags/{dag_id}'
+        print(f"Calling [start_dag]#1 {patch_string}")
+        result = self.session.patch(patch_string, json={'is_paused': False})
         try:
             result_json = result.json()
         except ValueError:
             result_json = str(result)
         print(f"Received [start_dag]#1 {result_json}")
         assert result.status_code == 200, f"Could not enable DAG: 
{result_json}"
-        post_string = f'http://{host}/api/experimental/dags/{dag_id}/dag_runs'
+        post_string = f'http://{host}/api/v1/dags/{dag_id}/dagRuns'
         print(f"Calling [start_dag]#2 {post_string}")
         # Trigger a new dagrun
         result = self.session.post(post_string, json={})
@@ -179,36 +182,38 @@ class TestKubernetesExecutor(unittest.TestCase):
 
         time.sleep(1)
 
-        get_string = f'http://{host}/api/experimental/latest_runs'
+        get_string = f'http://{host}/api/v1/dags/{dag_id}/dagRuns'
         print(f"Calling [start_dag]#3 {get_string}")
         result = self.session.get(get_string)
-        assert result.status_code == 200, f"Could not get the latest DAG-run: 
{result.json()}"
+        assert result.status_code == 200, f"Could not get DAGRuns: 
{result.json()}"
         result_json = result.json()
         print(f"Received: [start_dag]#3 {result_json}")
         return result_json
 
     def start_job_in_kubernetes(self, dag_id, host):
         result_json = self.start_dag(dag_id=dag_id, host=host)
-        assert len(result_json['items']) > 0
+        dag_runs = result_json['dag_runs']
+        assert len(dag_runs) > 0
         execution_date = None
-        for dag_run in result_json['items']:
+        dag_run_id = None
+        for dag_run in dag_runs:
             if dag_run['dag_id'] == dag_id:
                 execution_date = dag_run['execution_date']
+                dag_run_id = dag_run['dag_run_id']
                 break
         assert execution_date is not None, f"No execution_date can be found 
for the dag with {dag_id}"
-        return execution_date
+        return dag_run_id, execution_date
 
     def test_integration_run_dag(self):
         host = KUBERNETES_HOST_PORT
         dag_id = 'example_kubernetes_executor_config'
-
-        execution_date = self.start_job_in_kubernetes(dag_id, host)
-        print(f"Found the job with execution date {execution_date}")
+        dag_run_id, execution_date = self.start_job_in_kubernetes(dag_id, host)
+        print(f"Found the job with execution_date {execution_date}")
 
         # Wait some time for the operator to complete
         self.monitor_task(
             host=host,
-            execution_date=execution_date,
+            dag_run_id=dag_run_id,
             dag_id=dag_id,
             task_id='start_task',
             expected_final_state='success',
@@ -227,7 +232,7 @@ class TestKubernetesExecutor(unittest.TestCase):
         host = KUBERNETES_HOST_PORT
         dag_id = 'example_kubernetes_executor_config'
 
-        execution_date = self.start_job_in_kubernetes(dag_id, host)
+        dag_run_id, execution_date = self.start_job_in_kubernetes(dag_id, host)
 
         self._delete_airflow_pod("scheduler")
 
@@ -236,7 +241,7 @@ class TestKubernetesExecutor(unittest.TestCase):
         # Wait some time for the operator to complete
         self.monitor_task(
             host=host,
-            execution_date=execution_date,
+            dag_run_id=dag_run_id,
             dag_id=dag_id,
             task_id='start_task',
             expected_final_state='success',
@@ -245,7 +250,7 @@ class TestKubernetesExecutor(unittest.TestCase):
 
         self.monitor_task(
             host=host,
-            execution_date=execution_date,
+            dag_run_id=dag_run_id,
             dag_id=dag_id,
             task_id='other_namespace_task',
             expected_final_state='success',
diff --git a/scripts/ci/libraries/_kind.sh b/scripts/ci/libraries/_kind.sh
index a7e75d7..268effb 100644
--- a/scripts/ci/libraries/_kind.sh
+++ b/scripts/ci/libraries/_kind.sh
@@ -331,8 +331,7 @@ function kind::deploy_airflow_with_helm() {
         --set "images.airflow.repository=${DOCKERHUB_USER}/${DOCKERHUB_REPO}" \
         --set "images.airflow.tag=${AIRFLOW_PROD_BASE_TAG}-kubernetes" -v 1 \
         --set "defaultAirflowTag=${AIRFLOW_PROD_BASE_TAG}-kubernetes" -v 1 \
-        --set "config.api.auth_backend=airflow.api.auth.backend.default" \
-        --set "config.api.enable_experimental_api=true" \
+        --set "config.api.auth_backend=airflow.api.auth.backend.basic_auth" \
         --set "config.logging.logging_level=DEBUG" \
         --set "executor=KubernetesExecutor"
     echo

Reply via email to