This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f7ad549f2d Add an option to use a direct DB connection in KEDA when
pgbouncer is enabled (#32608)
f7ad549f2d is described below
commit f7ad549f2d7119a6496e3e66c43f078fbcc98ec1
Author: Hussein Awala <[email protected]>
AuthorDate: Sat Jul 15 22:52:38 2023 +0200
Add an option to use a direct DB connection in KEDA when pgbouncer is
enabled (#32608)
* Add an option to use a direct DB connection in KEDA when pgbouncer is
enabled
Signed-off-by: Hussein Awala <[email protected]>
* Fix helm doc
---------
Signed-off-by: Hussein Awala <[email protected]>
---
chart/templates/_helpers.yaml | 7 ++
.../secrets/metadata-connection-secret.yaml | 5 +
chart/templates/workers/worker-kedaautoscaler.yaml | 4 +
chart/values.schema.json | 5 +
chart/values.yaml | 4 +
helm_tests/other/test_keda.py | 108 +++++++++++++++++++++
6 files changed, 133 insertions(+)
diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml
index fc805e56b5..6d4d4bac8d 100644
--- a/chart/templates/_helpers.yaml
+++ b/chart/templates/_helpers.yaml
@@ -67,6 +67,13 @@ If release name contains chart name it will be used as a
full name.
name: {{ template "airflow_metadata_secret" . }}
key: connection
{{- end }}
+ {{- if and .Values.workers.keda.enabled .Values.pgbouncer.enabled (not
.Values.workers.keda.usePgbouncer) }}
+ - name: KEDA_DB_CONN
+ valueFrom:
+ secretKeyRef:
+ name: {{ template "airflow_metadata_secret" . }}
+ key: kedaConnection
+ {{- end }}
{{- if .Values.enableBuiltInSecretEnvVars.AIRFLOW__WEBSERVER__SECRET_KEY }}
- name: AIRFLOW__WEBSERVER__SECRET_KEY
valueFrom:
diff --git a/chart/templates/secrets/metadata-connection-secret.yaml
b/chart/templates/secrets/metadata-connection-secret.yaml
index 6a3c839ca0..c7597cd300 100644
--- a/chart/templates/secrets/metadata-connection-secret.yaml
+++ b/chart/templates/secrets/metadata-connection-secret.yaml
@@ -45,4 +45,9 @@ data:
{{- with .Values.data.metadataConnection }}
connection: {{ urlJoin (dict "scheme" .protocol "userinfo" (printf "%s:%s"
(.user | urlquery) (.pass | urlquery) ) "host" (printf "%s:%s" $host $port)
"path" (printf "/%s" $database) "query" $query) | b64enc | quote }}
{{- end }}
+ {{- if and .Values.workers.keda.enabled .Values.pgbouncer.enabled (not
.Values.workers.keda.usePgbouncer) }}
+ {{- with .Values.data.metadataConnection }}
+ kedaConnection: {{ urlJoin (dict "scheme" .protocol "userinfo" (printf
"%s:%s" (.user | urlquery) (.pass | urlquery) ) "host" (printf "%s:%s"
$metadataHost $port) "path" (printf "/%s" $database) "query" $query) | b64enc |
quote }}
+ {{- end }}
+ {{- end }}
{{- end }}
diff --git a/chart/templates/workers/worker-kedaautoscaler.yaml
b/chart/templates/workers/worker-kedaautoscaler.yaml
index 44e17e445b..7ce3d519f7 100644
--- a/chart/templates/workers/worker-kedaautoscaler.yaml
+++ b/chart/templates/workers/worker-kedaautoscaler.yaml
@@ -50,6 +50,10 @@ spec:
- type: postgresql
metadata:
targetQueryValue: "1"
+ {{- if and .Values.pgbouncer.enabled (not
.Values.workers.keda.usePgbouncer) }}
+ connectionFromEnv: KEDA_DB_CONN
+ {{- else }}
connectionFromEnv: AIRFLOW_CONN_AIRFLOW_DB
+ {{- end }}
query: {{ tpl .Values.workers.keda.query . | quote }}
{{- end }}
diff --git a/chart/values.schema.json b/chart/values.schema.json
index cb33bb7b4d..a4a329321a 100644
--- a/chart/values.schema.json
+++ b/chart/values.schema.json
@@ -1474,6 +1474,11 @@
"description": "Query to use for KEDA autoscaling.
Must return a single integer.",
"type": "string",
"default": "SELECT ceil(COUNT(*)::decimal / {{
.Values.config.celery.worker_concurrency }}) FROM task_instance WHERE
(state='running' OR state='queued') {{- if eq .Values.executor
\"CeleryKubernetesExecutor\" }} AND queue != '{{
.Values.config.celery_kubernetes_executor.kubernetes_queue }}' {{- end }}"
+ },
+ "usePgbouncer": {
+ "description": "Weather to use PGBouncer to
connect to the database or not when it is enabled. This configuration will be
ignored if PGBouncer is not enabled.",
+ "type": "boolean",
+ "default": true
}
}
},
diff --git a/chart/values.yaml b/chart/values.yaml
index d0cfdc3a17..53d9630007 100644
--- a/chart/values.yaml
+++ b/chart/values.yaml
@@ -562,6 +562,10 @@ workers:
AND queue != '{{
.Values.config.celery_kubernetes_executor.kubernetes_queue }}'
{{- end }}
+ # Weather to use PGBouncer to connect to the database or not when it is
enabled
+ # This configuration will be ignored if PGBouncer is not enabled
+ usePgbouncer: true
+
persistence:
# Enable persistent volumes
enabled: true
diff --git a/helm_tests/other/test_keda.py b/helm_tests/other/test_keda.py
index 774b617b50..5fd6c14f8e 100644
--- a/helm_tests/other/test_keda.py
+++ b/helm_tests/other/test_keda.py
@@ -162,3 +162,111 @@ class TestKeda:
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
)
assert jmespath.search("spec.scaleTargetRef.kind", docs[0]) == kind
+
+ def test_default_keda_db_connection(self):
+ """Verify default keda db connection."""
+ import base64
+
+ docs = render_chart(
+ values={
+ "workers": {"keda": {"enabled": True}},
+ "executor": "CeleryExecutor",
+ },
+ show_only=[
+ "templates/workers/worker-deployment.yaml",
+ "templates/workers/worker-kedaautoscaler.yaml",
+ "templates/secrets/metadata-connection-secret.yaml",
+ ],
+ )
+ worker_deployment = docs[0]
+ keda_autoscaler = docs[1]
+ metadata_connection_secret = docs[2]
+
+ worker_container_env_vars = jmespath.search(
+ "spec.template.spec.containers[?name=='worker'].env[].name",
worker_deployment
+ )
+ assert "AIRFLOW_CONN_AIRFLOW_DB" in worker_container_env_vars
+ assert "KEDA_DB_CONN" not in worker_container_env_vars
+
+ secret_data = jmespath.search("data", metadata_connection_secret)
+ assert "connection" in secret_data.keys()
+ assert "@release-name-postgresql" in
base64.b64decode(secret_data["connection"]).decode()
+ assert "kedaConnection" not in secret_data.keys()
+
+ autoscaler_connection_env_var = jmespath.search(
+ "spec.triggers[0].metadata.connectionFromEnv", keda_autoscaler
+ )
+ assert autoscaler_connection_env_var == "AIRFLOW_CONN_AIRFLOW_DB"
+
+ def test_default_keda_db_connection_pgbouncer_enabled(self):
+ """Verify keda db connection when pgbouncer is enabled."""
+ import base64
+
+ docs = render_chart(
+ values={
+ "workers": {"keda": {"enabled": True}},
+ "executor": "CeleryExecutor",
+ "pgbouncer": {"enabled": True},
+ },
+ show_only=[
+ "templates/workers/worker-deployment.yaml",
+ "templates/workers/worker-kedaautoscaler.yaml",
+ "templates/secrets/metadata-connection-secret.yaml",
+ ],
+ )
+ worker_deployment = docs[0]
+ keda_autoscaler = docs[1]
+ metadata_connection_secret = docs[2]
+
+ worker_container_env_vars = jmespath.search(
+ "spec.template.spec.containers[?name=='worker'].env[].name",
worker_deployment
+ )
+ assert "AIRFLOW_CONN_AIRFLOW_DB" in worker_container_env_vars
+ assert "KEDA_DB_CONN" not in worker_container_env_vars
+
+ secret_data = jmespath.search("data", metadata_connection_secret)
+ assert "connection" in secret_data.keys()
+ assert "@release-name-pgbouncer" in
base64.b64decode(secret_data["connection"]).decode()
+ assert "kedaConnection" not in secret_data.keys()
+
+ autoscaler_connection_env_var = jmespath.search(
+ "spec.triggers[0].metadata.connectionFromEnv", keda_autoscaler
+ )
+ assert autoscaler_connection_env_var == "AIRFLOW_CONN_AIRFLOW_DB"
+
+ def
test_default_keda_db_connection_pgbouncer_enabled_usePgbouncer_false(self):
+ """Verify keda db connection when pgbouncer is enabled and
usePgbouncer is false."""
+ import base64
+
+ docs = render_chart(
+ values={
+ "workers": {"keda": {"enabled": True, "usePgbouncer": False}},
+ "executor": "CeleryExecutor",
+ "pgbouncer": {"enabled": True},
+ },
+ show_only=[
+ "templates/workers/worker-deployment.yaml",
+ "templates/workers/worker-kedaautoscaler.yaml",
+ "templates/secrets/metadata-connection-secret.yaml",
+ ],
+ )
+ worker_deployment = docs[0]
+ keda_autoscaler = docs[1]
+ metadata_connection_secret = docs[2]
+
+ worker_container_env_vars = jmespath.search(
+ "spec.template.spec.containers[?name=='worker'].env[].name",
worker_deployment
+ )
+ assert "AIRFLOW_CONN_AIRFLOW_DB" in worker_container_env_vars
+ assert "KEDA_DB_CONN" in worker_container_env_vars
+
+ secret_data = jmespath.search("data", metadata_connection_secret)
+ assert "connection" in secret_data.keys()
+ assert "@release-name-pgbouncer" in
base64.b64decode(secret_data["connection"]).decode()
+ assert "kedaConnection" in secret_data.keys()
+ assert "@release-name-postgresql" in
base64.b64decode(secret_data["kedaConnection"]).decode()
+
+ autoscaler_connection_env_var = jmespath.search(
+ "spec.triggers[0].metadata.connectionFromEnv", keda_autoscaler
+ )
+ assert autoscaler_connection_env_var == "KEDA_DB_CONN"