ashb commented on code in PR #56589:
URL: https://github.com/apache/airflow/pull/56589#discussion_r2447505486


##########
chart/templates/workers/worker-deployment.yaml:
##########
@@ -24,93 +24,146 @@
 {{- $keda := .Values.workers.keda.enabled }}
 {{- $hpa := and .Values.workers.hpa.enabled (not .Values.workers.keda.enabled) 
}}
 {{- if or (contains "CeleryExecutor" .Values.executor) (contains 
"CeleryKubernetesExecutor" .Values.executor) }}
-{{- $nodeSelector := or .Values.workers.nodeSelector .Values.nodeSelector }}
-{{- $affinity := or .Values.workers.affinity .Values.affinity }}
-{{- $tolerations := or .Values.workers.tolerations .Values.tolerations }}
-{{- $topologySpreadConstraints := or .Values.workers.topologySpreadConstraints 
.Values.topologySpreadConstraints }}
-{{- $revisionHistoryLimit := or .Values.workers.revisionHistoryLimit 
.Values.revisionHistoryLimit }}
-{{- $securityContext := include "airflowPodSecurityContext" (list . 
.Values.workers) }}
-{{- $containerSecurityContext := include "containerSecurityContext" (list . 
.Values.workers) }}
-{{- $containerSecurityContextPersistence := include "containerSecurityContext" 
(list . .Values.workers.persistence) }}
-{{- $containerSecurityContextWaitForMigrations := include 
"containerSecurityContext" (list . .Values.workers.waitForMigrations) }}
-{{- $containerSecurityContextLogGroomerSidecar := include 
"containerSecurityContext" (list . .Values.workers.logGroomerSidecar) }}
-{{- $containerSecurityContextKerberosSidecar := include 
"containerSecurityContext" (list . .Values.workers.kerberosSidecar) }}
-{{- $containerLifecycleHooks := or .Values.workers.containerLifecycleHooks 
.Values.containerLifecycleHooks }}
-{{- $containerLifecycleHooksLogGroomerSidecar := or 
.Values.workers.logGroomerSidecar.containerLifecycleHooks 
.Values.containerLifecycleHooks }}
-{{- $containerLifecycleHooksKerberosSidecar := or 
.Values.workers.kerberosSidecar.containerLifecycleHooks 
.Values.containerLifecycleHooks }}
-{{- $safeToEvict := dict "cluster-autoscaler.kubernetes.io/safe-to-evict" 
(.Values.workers.safeToEvict | toString) }}
-{{- $podAnnotations := mergeOverwrite (deepCopy .Values.airflowPodAnnotations) 
$safeToEvict .Values.workers.podAnnotations }}
-{{- $schedulerName := or .Values.workers.schedulerName .Values.schedulerName }}
+
+{{- /* Build worker groups list: use celeryQueueGroups if defined, otherwise 
create default worker */ -}}
+{{- $workerGroups := list }}
+{{- if .Values.workers.celery.celeryQueueGroups }}
+  {{- $workerGroups = .Values.workers.celery.celeryQueueGroups }}
+{{- else }}
+  {{- $defaultWorker := dict
+    "name" ""
+    "queues" ""
+    "replicas" .Values.workers.replicas
+    "nodeSelector" (or .Values.workers.nodeSelector .Values.nodeSelector)
+    "tolerations" (or .Values.workers.tolerations .Values.tolerations)
+    "resources" .Values.workers.resources
+    "labels" .Values.workers.labels
+    "podAnnotations" .Values.workers.podAnnotations
+    "priorityClassName" .Values.workers.priorityClassName
+    "env" .Values.workers.env }}
+  {{- $workerGroups = list $defaultWorker }}
+{{- end }}
+
+{{- /* Loop through all worker groups (either celeryQueueGroups or default) */ 
-}}
+{{- range $groupIndex, $workerGroup := $workerGroups }}
+{{- if $groupIndex }}
+---
+{{- end }}
+
+{{- /* Set variables based on whether this is default worker or custom group 
*/ -}}
+{{- $isDefaultWorker := eq $workerGroup.name "" }}
+{{- $workerName := ternary "worker" (printf "worker-%s" $workerGroup.name) 
$isDefaultWorker }}
+{{- $groupQueues := $workerGroup.queues }}
+{{- $groupReplicas := $workerGroup.replicas | default 1 }}
+{{- $groupNodeSelector := $workerGroup.nodeSelector | default dict }}
+{{- $groupTolerations := $workerGroup.tolerations | default list }}
+{{- $groupResources := $workerGroup.resources | default dict }}
+{{- $groupLabels := $workerGroup.labels | default dict }}
+{{- $groupPodAnnotations := $workerGroup.podAnnotations | default dict }}
+{{- $groupPriorityClassName := $workerGroup.priorityClassName | default "" }}
+{{- $groupEnv := $workerGroup.env | default list }}
+
+{{- /* Standard variables (same for all workers) */ -}}
+{{- $nodeSelector := or $groupNodeSelector $.Values.nodeSelector }}
+{{- $affinity := or $.Values.workers.affinity $.Values.affinity }}
+{{- $tolerations := or $groupTolerations $.Values.tolerations }}
+{{- $topologySpreadConstraints := or 
$.Values.workers.topologySpreadConstraints $.Values.topologySpreadConstraints }}
+{{- $revisionHistoryLimit := or $.Values.workers.revisionHistoryLimit 
$.Values.revisionHistoryLimit }}
+{{- $securityContext := include "airflowPodSecurityContext" (list $ 
$.Values.workers) }}
+{{- $containerSecurityContext := include "containerSecurityContext" (list $ 
$.Values.workers) }}
+{{- $containerSecurityContextPersistence := include "containerSecurityContext" 
(list $ $.Values.workers.persistence) }}
+{{- $containerSecurityContextWaitForMigrations := include 
"containerSecurityContext" (list $ $.Values.workers.waitForMigrations) }}
+{{- $containerSecurityContextLogGroomerSidecar := include 
"containerSecurityContext" (list $ $.Values.workers.logGroomerSidecar) }}
+{{- $containerSecurityContextKerberosSidecar := include 
"containerSecurityContext" (list $ $.Values.workers.kerberosSidecar) }}
+{{- $containerLifecycleHooks := or $.Values.workers.containerLifecycleHooks 
$.Values.containerLifecycleHooks }}
+{{- $containerLifecycleHooksLogGroomerSidecar := or 
$.Values.workers.logGroomerSidecar.containerLifecycleHooks 
$.Values.containerLifecycleHooks }}
+{{- $containerLifecycleHooksKerberosSidecar := or 
$.Values.workers.kerberosSidecar.containerLifecycleHooks 
$.Values.containerLifecycleHooks }}
+{{- $safeToEvict := dict "cluster-autoscaler.kubernetes.io/safe-to-evict" 
($.Values.workers.safeToEvict | toString) }}
+{{- $mergedPodAnnotations := mergeOverwrite (deepCopy 
$.Values.airflowPodAnnotations) $safeToEvict $groupPodAnnotations }}
+{{- $schedulerName := or $.Values.workers.schedulerName $.Values.schedulerName 
}}
 apiVersion: apps/v1
 kind: {{ if $persistence }}StatefulSet{{ else }}Deployment{{ end }}
 metadata:
-  name: {{ include "airflow.fullname" . }}-worker
+  name: {{ include "airflow.fullname" $ }}-{{ $workerName }}
   labels:
     tier: airflow
     component: worker
-    release: {{ .Release.Name }}
-    chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
-    heritage: {{ .Release.Service }}
-    {{- with .Values.labels }}
+    {{- if not $isDefaultWorker }}
+    worker-group: {{ $workerGroup.name }}
+    {{- end }}
+    release: {{ $.Release.Name }}
+    chart: "{{ $.Chart.Name }}-{{ $.Chart.Version }}"
+    heritage: {{ $.Release.Service }}
+    {{- with $.Values.labels }}
       {{- toYaml . | nindent 4 }}
     {{- end }}
-  {{- if .Values.workers.annotations }}
-  annotations: {{- toYaml .Values.workers.annotations | nindent 4 }}
+  {{- if $.Values.workers.annotations }}
+  annotations: {{- toYaml $.Values.workers.annotations | nindent 4 }}
   {{- end }}
 spec:
   {{- if $persistence }}
-  serviceName: {{ include "airflow.fullname" . }}-worker
+  serviceName: {{ include "airflow.fullname" $ }}-{{ $workerName }}
   {{- end }}
-  {{- if and (not $keda) (not $hpa) }}
-  replicas: {{ .Values.workers.replicas }}
+  {{- if not $isDefaultWorker }}
+  replicas: {{ $groupReplicas }}
+  {{- else if and (not $keda) (not $hpa) }}
+  replicas: {{ $.Values.workers.replicas }}

Review Comment:
   Why does keda not apply to the worker groups?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to