This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9ef9dc46f3 Adds support for custom query in workers KEDA trigger
(#32308)
9ef9dc46f3 is described below
commit 9ef9dc46f393a9e093b2d30928c30e72508b9394
Author: Hussein Awala <[email protected]>
AuthorDate: Tue Jul 4 22:04:22 2023 +0200
Adds support for custom query in workers KEDA trigger (#32308)
Signed-off-by: Hussein Awala <[email protected]>
---
chart/templates/workers/worker-kedaautoscaler.yaml | 8 +---
chart/values.schema.json | 5 +++
chart/values.yaml | 9 +++++
tests/charts/airflow_core/test_worker.py | 45 ++++++++++++++++++++++
4 files changed, 60 insertions(+), 7 deletions(-)
diff --git a/chart/templates/workers/worker-kedaautoscaler.yaml
b/chart/templates/workers/worker-kedaautoscaler.yaml
index 036e172d89..44e17e445b 100644
--- a/chart/templates/workers/worker-kedaautoscaler.yaml
+++ b/chart/templates/workers/worker-kedaautoscaler.yaml
@@ -51,11 +51,5 @@ spec:
metadata:
targetQueryValue: "1"
connectionFromEnv: AIRFLOW_CONN_AIRFLOW_DB
- query: >-
- SELECT ceil(COUNT(*)::decimal / {{
.Values.config.celery.worker_concurrency }})
- FROM task_instance
- WHERE (state='running' OR state='queued')
- {{- if eq .Values.executor "CeleryKubernetesExecutor" }}
- AND queue != '{{
.Values.config.celery_kubernetes_executor.kubernetes_queue }}'
- {{- end }}
+ query: {{ tpl .Values.workers.keda.query . | quote }}
{{- end }}
diff --git a/chart/values.schema.json b/chart/values.schema.json
index 80e25b9b10..56689532fb 100644
--- a/chart/values.schema.json
+++ b/chart/values.schema.json
@@ -1433,6 +1433,11 @@
}
}
}
+ },
+ "query": {
+ "description": "Query to use for KEDA autoscaling.
Must return a single integer.",
+ "type": "string",
+ "default": "SELECT ceil(COUNT(*)::decimal / {{
.Values.config.celery.worker_concurrency }}) FROM task_instance WHERE
(state='running' OR state='queued') {{- if eq .Values.executor
\"CeleryKubernetesExecutor\" }} AND queue != '{{
.Values.config.celery_kubernetes_executor.kubernetes_queue }}' {{- end }}"
}
}
},
diff --git a/chart/values.yaml b/chart/values.yaml
index 3beeb3765c..3b07b17049 100644
--- a/chart/values.yaml
+++ b/chart/values.yaml
@@ -538,6 +538,15 @@ workers:
# value: 100
# periodSeconds: 15
+ # Query to use for KEDA autoscaling. Must return a single integer.
+ query: >-
+ SELECT ceil(COUNT(*)::decimal / {{
.Values.config.celery.worker_concurrency }})
+ FROM task_instance
+ WHERE (state='running' OR state='queued')
+ {{- if eq .Values.executor "CeleryKubernetesExecutor" }}
+ AND queue != '{{
.Values.config.celery_kubernetes_executor.kubernetes_queue }}'
+ {{- end }}
+
persistence:
# Enable persistent volumes
enabled: true
diff --git a/tests/charts/airflow_core/test_worker.py
b/tests/charts/airflow_core/test_worker.py
index a49b78fee4..f4e02024a0 100644
--- a/tests/charts/airflow_core/test_worker.py
+++ b/tests/charts/airflow_core/test_worker.py
@@ -640,6 +640,51 @@ class TestWorkerKedaAutoScaler:
assert "replicas" not in jmespath.search("spec", docs[0])
+ @pytest.mark.parametrize(
+ "query, executor, expected_query",
+ [
+ # default query with CeleryExecutor
+ (
+ None,
+ "CeleryExecutor",
+ "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance"
+ " WHERE (state='running' OR state='queued')",
+ ),
+ # default query with CeleryKubernetesExecutor
+ (
+ None,
+ "CeleryKubernetesExecutor",
+ "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance"
+ " WHERE (state='running' OR state='queued') AND queue !=
'kubernetes'",
+ ),
+ # test custom static query
+ (
+ "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance",
+ "CeleryKubernetesExecutor",
+ "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance",
+ ),
+ # test custom template query
+ (
+ "SELECT ceil(COUNT(*)::decimal / {{ mul
.Values.config.celery.worker_concurrency 2 }})"
+ " FROM task_instance",
+ "CeleryKubernetesExecutor",
+ "SELECT ceil(COUNT(*)::decimal / 32) FROM task_instance",
+ ),
+ ],
+ )
+ def test_should_use_keda_query(self, query, executor, expected_query):
+
+ docs = render_chart(
+ values={
+ "executor": executor,
+ "workers": {
+ "keda": {"enabled": True, **({"query": query} if query
else {})},
+ },
+ },
+ show_only=["templates/workers/worker-kedaautoscaler.yaml"],
+ )
+ assert expected_query ==
jmespath.search("spec.triggers[0].metadata.query", docs[0])
+
class TestWorkerNetworkPolicy:
def test_should_add_component_specific_labels(self):