This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ef9dc46f3 Adds support for custom query in workers KEDA trigger 
(#32308)
9ef9dc46f3 is described below

commit 9ef9dc46f393a9e093b2d30928c30e72508b9394
Author: Hussein Awala <[email protected]>
AuthorDate: Tue Jul 4 22:04:22 2023 +0200

    Adds support for custom query in workers KEDA trigger (#32308)
    
    Signed-off-by: Hussein Awala <[email protected]>
---
 chart/templates/workers/worker-kedaautoscaler.yaml |  8 +---
 chart/values.schema.json                           |  5 +++
 chart/values.yaml                                  |  9 +++++
 tests/charts/airflow_core/test_worker.py           | 45 ++++++++++++++++++++++
 4 files changed, 60 insertions(+), 7 deletions(-)

diff --git a/chart/templates/workers/worker-kedaautoscaler.yaml 
b/chart/templates/workers/worker-kedaautoscaler.yaml
index 036e172d89..44e17e445b 100644
--- a/chart/templates/workers/worker-kedaautoscaler.yaml
+++ b/chart/templates/workers/worker-kedaautoscaler.yaml
@@ -51,11 +51,5 @@ spec:
       metadata:
         targetQueryValue: "1"
         connectionFromEnv: AIRFLOW_CONN_AIRFLOW_DB
-        query: >-
-          SELECT ceil(COUNT(*)::decimal / {{ 
.Values.config.celery.worker_concurrency }})
-          FROM task_instance
-          WHERE (state='running' OR state='queued')
-          {{- if eq .Values.executor "CeleryKubernetesExecutor" }}
-          AND queue != '{{ 
.Values.config.celery_kubernetes_executor.kubernetes_queue }}'
-          {{- end }}
+        query: {{ tpl .Values.workers.keda.query . | quote }}
 {{- end }}
diff --git a/chart/values.schema.json b/chart/values.schema.json
index 80e25b9b10..56689532fb 100644
--- a/chart/values.schema.json
+++ b/chart/values.schema.json
@@ -1433,6 +1433,11 @@
                                     }
                                 }
                             }
+                        },
+                        "query": {
+                            "description": "Query to use for KEDA autoscaling. 
Must return a single integer.",
+                            "type": "string",
+                            "default": "SELECT ceil(COUNT(*)::decimal / {{ 
.Values.config.celery.worker_concurrency }}) FROM task_instance WHERE 
(state='running' OR state='queued') {{- if eq .Values.executor 
\"CeleryKubernetesExecutor\" }} AND queue != '{{ 
.Values.config.celery_kubernetes_executor.kubernetes_queue }}' {{- end }}"
                         }
                     }
                 },
diff --git a/chart/values.yaml b/chart/values.yaml
index 3beeb3765c..3b07b17049 100644
--- a/chart/values.yaml
+++ b/chart/values.yaml
@@ -538,6 +538,15 @@ workers:
     #           value: 100
     #           periodSeconds: 15
 
+    # Query to use for KEDA autoscaling. Must return a single integer.
+    query: >-
+      SELECT ceil(COUNT(*)::decimal / {{ 
.Values.config.celery.worker_concurrency }})
+      FROM task_instance
+      WHERE (state='running' OR state='queued')
+      {{- if eq .Values.executor "CeleryKubernetesExecutor" }}
+      AND queue != '{{ 
.Values.config.celery_kubernetes_executor.kubernetes_queue }}'
+      {{- end }}
+
   persistence:
     # Enable persistent volumes
     enabled: true
diff --git a/tests/charts/airflow_core/test_worker.py 
b/tests/charts/airflow_core/test_worker.py
index a49b78fee4..f4e02024a0 100644
--- a/tests/charts/airflow_core/test_worker.py
+++ b/tests/charts/airflow_core/test_worker.py
@@ -640,6 +640,51 @@ class TestWorkerKedaAutoScaler:
 
         assert "replicas" not in jmespath.search("spec", docs[0])
 
+    @pytest.mark.parametrize(
+        "query, executor, expected_query",
+        [
+            # default query with CeleryExecutor
+            (
+                None,
+                "CeleryExecutor",
+                "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance"
+                " WHERE (state='running' OR state='queued')",
+            ),
+            # default query with CeleryKubernetesExecutor
+            (
+                None,
+                "CeleryKubernetesExecutor",
+                "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance"
+                " WHERE (state='running' OR state='queued') AND queue != 
'kubernetes'",
+            ),
+            # test custom static query
+            (
+                "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance",
+                "CeleryKubernetesExecutor",
+                "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance",
+            ),
+            # test custom template query
+            (
+                "SELECT ceil(COUNT(*)::decimal / {{ mul 
.Values.config.celery.worker_concurrency 2 }})"
+                " FROM task_instance",
+                "CeleryKubernetesExecutor",
+                "SELECT ceil(COUNT(*)::decimal / 32) FROM task_instance",
+            ),
+        ],
+    )
+    def test_should_use_keda_query(self, query, executor, expected_query):
+
+        docs = render_chart(
+            values={
+                "executor": executor,
+                "workers": {
+                    "keda": {"enabled": True, **({"query": query} if query 
else {})},
+                },
+            },
+            show_only=["templates/workers/worker-kedaautoscaler.yaml"],
+        )
+        assert expected_query == 
jmespath.search("spec.triggers[0].metadata.query", docs[0])
+
 
 class TestWorkerNetworkPolicy:
     def test_should_add_component_specific_labels(self):

Reply via email to