This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 91bb877 Use the Stable REST API for Kubernetes executor integration
tests (#15644)
91bb877 is described below
commit 91bb877ff4b0e0934cb081dd103898bd7386c21e
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue May 4 18:02:54 2021 +0100
Use the Stable REST API for Kubernetes executor integration tests (#15644)
Currently, we use the experimental REST API to run the Kubernetes executor
integration tests.
This PR changes this to use the stable REST API for these tests
---
kubernetes_tests/test_kubernetes_executor.py | 51 +++++++++++++++-------------
scripts/ci/libraries/_kind.sh | 3 +-
2 files changed, 29 insertions(+), 25 deletions(-)
diff --git a/kubernetes_tests/test_kubernetes_executor.py
b/kubernetes_tests/test_kubernetes_executor.py
index 2933547..169c957 100644
--- a/kubernetes_tests/test_kubernetes_executor.py
+++ b/kubernetes_tests/test_kubernetes_executor.py
@@ -73,6 +73,7 @@ class TestKubernetesExecutor(unittest.TestCase):
def _get_session_with_retries(self):
session = requests.Session()
+ session.auth = ('admin', 'admin')
retries = Retry(total=3, backoff_factor=1)
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))
@@ -93,18 +94,17 @@ class TestKubernetesExecutor(unittest.TestCase):
def tearDown(self):
self.session.close()
- def monitor_task(self, host, execution_date, dag_id, task_id,
expected_final_state, timeout):
+ def monitor_task(self, host, dag_run_id, dag_id, task_id,
expected_final_state, timeout):
tries = 0
state = ''
max_tries = max(int(timeout / 5), 1)
# Wait some time for the operator to complete
while tries < max_tries:
time.sleep(5)
- # Trigger a new dagrun
+ # Check task state
try:
get_string = (
- f'http://{host}/api/experimental/dags/{dag_id}/'
- f'dag_runs/{execution_date}/tasks/{task_id}'
+
f'http://{host}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}'
)
print(f"Calling [monitor_task]#1 {get_string}")
result = self.session.get(get_string)
@@ -136,14 +136,17 @@ class TestKubernetesExecutor(unittest.TestCase):
# Wait some time for the operator to complete
while tries < max_tries:
time.sleep(5)
- get_string =
f'http://{host}/api/experimental/dags/{dag_id}/dag_runs/{execution_date}'
+ get_string = f'http://{host}/api/v1/dags/{dag_id}/dagRuns'
print(f"Calling {get_string}")
- # Trigger a new dagrun
+ # Get all dagruns
result = self.session.get(get_string)
assert result.status_code == 200, "Could not get the status"
result_json = result.json()
print(f"Received: {result}")
- state = result_json['state']
+ state = None
+ for dag_run in result_json['dag_runs']:
+ if dag_run['execution_date'] == execution_date:
+ state = dag_run['state']
check_call(["echo", f"Attempt {tries}: Current state of dag is
{state}"])
print(f"Attempt {tries}: Current state of dag is {state}")
@@ -157,16 +160,16 @@ class TestKubernetesExecutor(unittest.TestCase):
# Maybe check if we can retrieve the logs, but then we need to extend
the API
def start_dag(self, dag_id, host):
- get_string =
f'http://{host}/api/experimental/dags/{dag_id}/paused/false'
- print(f"Calling [start_dag]#1 {get_string}")
- result = self.session.get(get_string)
+ patch_string = f'http://{host}/api/v1/dags/{dag_id}'
+ print(f"Calling [start_dag]#1 {patch_string}")
+ result = self.session.patch(patch_string, json={'is_paused': False})
try:
result_json = result.json()
except ValueError:
result_json = str(result)
print(f"Received [start_dag]#1 {result_json}")
assert result.status_code == 200, f"Could not enable DAG:
{result_json}"
- post_string = f'http://{host}/api/experimental/dags/{dag_id}/dag_runs'
+ post_string = f'http://{host}/api/v1/dags/{dag_id}/dagRuns'
print(f"Calling [start_dag]#2 {post_string}")
# Trigger a new dagrun
result = self.session.post(post_string, json={})
@@ -179,36 +182,38 @@ class TestKubernetesExecutor(unittest.TestCase):
time.sleep(1)
- get_string = f'http://{host}/api/experimental/latest_runs'
+ get_string = f'http://{host}/api/v1/dags/{dag_id}/dagRuns'
print(f"Calling [start_dag]#3 {get_string}")
result = self.session.get(get_string)
- assert result.status_code == 200, f"Could not get the latest DAG-run:
{result.json()}"
+ assert result.status_code == 200, f"Could not get DAGRuns:
{result.json()}"
result_json = result.json()
print(f"Received: [start_dag]#3 {result_json}")
return result_json
def start_job_in_kubernetes(self, dag_id, host):
result_json = self.start_dag(dag_id=dag_id, host=host)
- assert len(result_json['items']) > 0
+ dag_runs = result_json['dag_runs']
+ assert len(dag_runs) > 0
execution_date = None
- for dag_run in result_json['items']:
+ dag_run_id = None
+ for dag_run in dag_runs:
if dag_run['dag_id'] == dag_id:
execution_date = dag_run['execution_date']
+ dag_run_id = dag_run['dag_run_id']
break
assert execution_date is not None, f"No execution_date can be found
for the dag with {dag_id}"
- return execution_date
+ return dag_run_id, execution_date
def test_integration_run_dag(self):
host = KUBERNETES_HOST_PORT
dag_id = 'example_kubernetes_executor_config'
-
- execution_date = self.start_job_in_kubernetes(dag_id, host)
- print(f"Found the job with execution date {execution_date}")
+ dag_run_id, execution_date = self.start_job_in_kubernetes(dag_id, host)
+ print(f"Found the job with execution_date {execution_date}")
# Wait some time for the operator to complete
self.monitor_task(
host=host,
- execution_date=execution_date,
+ dag_run_id=dag_run_id,
dag_id=dag_id,
task_id='start_task',
expected_final_state='success',
@@ -227,7 +232,7 @@ class TestKubernetesExecutor(unittest.TestCase):
host = KUBERNETES_HOST_PORT
dag_id = 'example_kubernetes_executor_config'
- execution_date = self.start_job_in_kubernetes(dag_id, host)
+ dag_run_id, execution_date = self.start_job_in_kubernetes(dag_id, host)
self._delete_airflow_pod("scheduler")
@@ -236,7 +241,7 @@ class TestKubernetesExecutor(unittest.TestCase):
# Wait some time for the operator to complete
self.monitor_task(
host=host,
- execution_date=execution_date,
+ dag_run_id=dag_run_id,
dag_id=dag_id,
task_id='start_task',
expected_final_state='success',
@@ -245,7 +250,7 @@ class TestKubernetesExecutor(unittest.TestCase):
self.monitor_task(
host=host,
- execution_date=execution_date,
+ dag_run_id=dag_run_id,
dag_id=dag_id,
task_id='other_namespace_task',
expected_final_state='success',
diff --git a/scripts/ci/libraries/_kind.sh b/scripts/ci/libraries/_kind.sh
index a7e75d7..268effb 100644
--- a/scripts/ci/libraries/_kind.sh
+++ b/scripts/ci/libraries/_kind.sh
@@ -331,8 +331,7 @@ function kind::deploy_airflow_with_helm() {
--set "images.airflow.repository=${DOCKERHUB_USER}/${DOCKERHUB_REPO}" \
--set "images.airflow.tag=${AIRFLOW_PROD_BASE_TAG}-kubernetes" -v 1 \
--set "defaultAirflowTag=${AIRFLOW_PROD_BASE_TAG}-kubernetes" -v 1 \
- --set "config.api.auth_backend=airflow.api.auth.backend.default" \
- --set "config.api.enable_experimental_api=true" \
+ --set "config.api.auth_backend=airflow.api.auth.backend.basic_auth" \
--set "config.logging.logging_level=DEBUG" \
--set "executor=KubernetesExecutor"
echo