This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f7ad549f2d Add an option to use a direct DB connection in KEDA when 
pgbouncer is enabled (#32608)
f7ad549f2d is described below

commit f7ad549f2d7119a6496e3e66c43f078fbcc98ec1
Author: Hussein Awala <[email protected]>
AuthorDate: Sat Jul 15 22:52:38 2023 +0200

    Add an option to use a direct DB connection in KEDA when pgbouncer is 
enabled (#32608)
    
    * Add an option to use a direct DB connection in KEDA when pgbouncer is 
enabled
    
    Signed-off-by: Hussein Awala <[email protected]>
    
    * Fix helm doc
    
    ---------
    
    Signed-off-by: Hussein Awala <[email protected]>
---
 chart/templates/_helpers.yaml                      |   7 ++
 .../secrets/metadata-connection-secret.yaml        |   5 +
 chart/templates/workers/worker-kedaautoscaler.yaml |   4 +
 chart/values.schema.json                           |   5 +
 chart/values.yaml                                  |   4 +
 helm_tests/other/test_keda.py                      | 108 +++++++++++++++++++++
 6 files changed, 133 insertions(+)

diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml
index fc805e56b5..6d4d4bac8d 100644
--- a/chart/templates/_helpers.yaml
+++ b/chart/templates/_helpers.yaml
@@ -67,6 +67,13 @@ If release name contains chart name it will be used as a 
full name.
         name: {{ template "airflow_metadata_secret" . }}
         key: connection
   {{- end }}
+  {{- if and .Values.workers.keda.enabled .Values.pgbouncer.enabled (not 
.Values.workers.keda.usePgbouncer) }}
+  - name: KEDA_DB_CONN
+    valueFrom:
+      secretKeyRef:
+        name: {{ template "airflow_metadata_secret" . }}
+        key: kedaConnection
+  {{- end }}
   {{- if .Values.enableBuiltInSecretEnvVars.AIRFLOW__WEBSERVER__SECRET_KEY }}
   - name: AIRFLOW__WEBSERVER__SECRET_KEY
     valueFrom:
diff --git a/chart/templates/secrets/metadata-connection-secret.yaml 
b/chart/templates/secrets/metadata-connection-secret.yaml
index 6a3c839ca0..c7597cd300 100644
--- a/chart/templates/secrets/metadata-connection-secret.yaml
+++ b/chart/templates/secrets/metadata-connection-secret.yaml
@@ -45,4 +45,9 @@ data:
   {{- with .Values.data.metadataConnection }}
   connection: {{ urlJoin (dict "scheme" .protocol "userinfo" (printf "%s:%s" 
(.user | urlquery) (.pass | urlquery) ) "host" (printf "%s:%s" $host $port) 
"path" (printf "/%s" $database) "query" $query) | b64enc | quote }}
   {{- end }}
+  {{- if and .Values.workers.keda.enabled .Values.pgbouncer.enabled (not 
.Values.workers.keda.usePgbouncer) }}
+  {{- with .Values.data.metadataConnection }}
+  kedaConnection: {{ urlJoin (dict "scheme" .protocol "userinfo" (printf 
"%s:%s" (.user | urlquery) (.pass | urlquery) ) "host" (printf "%s:%s" 
$metadataHost $port) "path" (printf "/%s" $database) "query" $query) | b64enc | 
quote }}
+  {{- end }}
+  {{- end }}
 {{- end }}
diff --git a/chart/templates/workers/worker-kedaautoscaler.yaml 
b/chart/templates/workers/worker-kedaautoscaler.yaml
index 44e17e445b..7ce3d519f7 100644
--- a/chart/templates/workers/worker-kedaautoscaler.yaml
+++ b/chart/templates/workers/worker-kedaautoscaler.yaml
@@ -50,6 +50,10 @@ spec:
     - type: postgresql
       metadata:
         targetQueryValue: "1"
+        {{- if and .Values.pgbouncer.enabled (not 
.Values.workers.keda.usePgbouncer) }}
+        connectionFromEnv: KEDA_DB_CONN
+        {{- else }}
         connectionFromEnv: AIRFLOW_CONN_AIRFLOW_DB
+        {{- end }}
         query: {{ tpl .Values.workers.keda.query . | quote }}
 {{- end }}
diff --git a/chart/values.schema.json b/chart/values.schema.json
index cb33bb7b4d..a4a329321a 100644
--- a/chart/values.schema.json
+++ b/chart/values.schema.json
@@ -1474,6 +1474,11 @@
                             "description": "Query to use for KEDA autoscaling. 
Must return a single integer.",
                             "type": "string",
                             "default": "SELECT ceil(COUNT(*)::decimal / {{ 
.Values.config.celery.worker_concurrency }}) FROM task_instance WHERE 
(state='running' OR state='queued') {{- if eq .Values.executor 
\"CeleryKubernetesExecutor\" }} AND queue != '{{ 
.Values.config.celery_kubernetes_executor.kubernetes_queue }}' {{- end }}"
+                        },
+                        "usePgbouncer": {
+                            "description": "Weather to use PGBouncer to 
connect to the database or not when it is enabled. This configuration will be 
ignored if PGBouncer is not enabled.",
+                            "type": "boolean",
+                            "default": true
                         }
                     }
                 },
diff --git a/chart/values.yaml b/chart/values.yaml
index d0cfdc3a17..53d9630007 100644
--- a/chart/values.yaml
+++ b/chart/values.yaml
@@ -562,6 +562,10 @@ workers:
       AND queue != '{{ 
.Values.config.celery_kubernetes_executor.kubernetes_queue }}'
       {{- end }}
 
+    # Weather to use PGBouncer to connect to the database or not when it is 
enabled
+    # This configuration will be ignored if PGBouncer is not enabled
+    usePgbouncer: true
+
   persistence:
     # Enable persistent volumes
     enabled: true
diff --git a/helm_tests/other/test_keda.py b/helm_tests/other/test_keda.py
index 774b617b50..5fd6c14f8e 100644
--- a/helm_tests/other/test_keda.py
+++ b/helm_tests/other/test_keda.py
@@ -162,3 +162,111 @@ class TestKeda:
             show_only=["templates/workers/worker-kedaautoscaler.yaml"],
         )
         assert jmespath.search("spec.scaleTargetRef.kind", docs[0]) == kind
+
+    def test_default_keda_db_connection(self):
+        """Verify default keda db connection."""
+        import base64
+
+        docs = render_chart(
+            values={
+                "workers": {"keda": {"enabled": True}},
+                "executor": "CeleryExecutor",
+            },
+            show_only=[
+                "templates/workers/worker-deployment.yaml",
+                "templates/workers/worker-kedaautoscaler.yaml",
+                "templates/secrets/metadata-connection-secret.yaml",
+            ],
+        )
+        worker_deployment = docs[0]
+        keda_autoscaler = docs[1]
+        metadata_connection_secret = docs[2]
+
+        worker_container_env_vars = jmespath.search(
+            "spec.template.spec.containers[?name=='worker'].env[].name", 
worker_deployment
+        )
+        assert "AIRFLOW_CONN_AIRFLOW_DB" in worker_container_env_vars
+        assert "KEDA_DB_CONN" not in worker_container_env_vars
+
+        secret_data = jmespath.search("data", metadata_connection_secret)
+        assert "connection" in secret_data.keys()
+        assert "@release-name-postgresql" in 
base64.b64decode(secret_data["connection"]).decode()
+        assert "kedaConnection" not in secret_data.keys()
+
+        autoscaler_connection_env_var = jmespath.search(
+            "spec.triggers[0].metadata.connectionFromEnv", keda_autoscaler
+        )
+        assert autoscaler_connection_env_var == "AIRFLOW_CONN_AIRFLOW_DB"
+
+    def test_default_keda_db_connection_pgbouncer_enabled(self):
+        """Verify keda db connection when pgbouncer is enabled."""
+        import base64
+
+        docs = render_chart(
+            values={
+                "workers": {"keda": {"enabled": True}},
+                "executor": "CeleryExecutor",
+                "pgbouncer": {"enabled": True},
+            },
+            show_only=[
+                "templates/workers/worker-deployment.yaml",
+                "templates/workers/worker-kedaautoscaler.yaml",
+                "templates/secrets/metadata-connection-secret.yaml",
+            ],
+        )
+        worker_deployment = docs[0]
+        keda_autoscaler = docs[1]
+        metadata_connection_secret = docs[2]
+
+        worker_container_env_vars = jmespath.search(
+            "spec.template.spec.containers[?name=='worker'].env[].name", 
worker_deployment
+        )
+        assert "AIRFLOW_CONN_AIRFLOW_DB" in worker_container_env_vars
+        assert "KEDA_DB_CONN" not in worker_container_env_vars
+
+        secret_data = jmespath.search("data", metadata_connection_secret)
+        assert "connection" in secret_data.keys()
+        assert "@release-name-pgbouncer" in 
base64.b64decode(secret_data["connection"]).decode()
+        assert "kedaConnection" not in secret_data.keys()
+
+        autoscaler_connection_env_var = jmespath.search(
+            "spec.triggers[0].metadata.connectionFromEnv", keda_autoscaler
+        )
+        assert autoscaler_connection_env_var == "AIRFLOW_CONN_AIRFLOW_DB"
+
+    def 
test_default_keda_db_connection_pgbouncer_enabled_usePgbouncer_false(self):
+        """Verify keda db connection when pgbouncer is enabled and 
usePgbouncer is false."""
+        import base64
+
+        docs = render_chart(
+            values={
+                "workers": {"keda": {"enabled": True, "usePgbouncer": False}},
+                "executor": "CeleryExecutor",
+                "pgbouncer": {"enabled": True},
+            },
+            show_only=[
+                "templates/workers/worker-deployment.yaml",
+                "templates/workers/worker-kedaautoscaler.yaml",
+                "templates/secrets/metadata-connection-secret.yaml",
+            ],
+        )
+        worker_deployment = docs[0]
+        keda_autoscaler = docs[1]
+        metadata_connection_secret = docs[2]
+
+        worker_container_env_vars = jmespath.search(
+            "spec.template.spec.containers[?name=='worker'].env[].name", 
worker_deployment
+        )
+        assert "AIRFLOW_CONN_AIRFLOW_DB" in worker_container_env_vars
+        assert "KEDA_DB_CONN" in worker_container_env_vars
+
+        secret_data = jmespath.search("data", metadata_connection_secret)
+        assert "connection" in secret_data.keys()
+        assert "@release-name-pgbouncer" in 
base64.b64decode(secret_data["connection"]).decode()
+        assert "kedaConnection" in secret_data.keys()
+        assert "@release-name-postgresql" in 
base64.b64decode(secret_data["kedaConnection"]).decode()
+
+        autoscaler_connection_env_var = jmespath.search(
+            "spec.triggers[0].metadata.connectionFromEnv", keda_autoscaler
+        )
+        assert autoscaler_connection_env_var == "KEDA_DB_CONN"

Reply via email to