jscheffl commented on code in PR #56589:
URL: https://github.com/apache/airflow/pull/56589#discussion_r2484890195


##########
chart/templates/workers/worker-deployment.yaml:
##########
@@ -24,93 +24,146 @@
 {{- $keda := .Values.workers.keda.enabled }}
 {{- $hpa := and .Values.workers.hpa.enabled (not .Values.workers.keda.enabled) 
}}
 {{- if or (contains "CeleryExecutor" .Values.executor) (contains 
"CeleryKubernetesExecutor" .Values.executor) }}
-{{- $nodeSelector := or .Values.workers.nodeSelector .Values.nodeSelector }}
-{{- $affinity := or .Values.workers.affinity .Values.affinity }}
-{{- $tolerations := or .Values.workers.tolerations .Values.tolerations }}
-{{- $topologySpreadConstraints := or .Values.workers.topologySpreadConstraints 
.Values.topologySpreadConstraints }}
-{{- $revisionHistoryLimit := or .Values.workers.revisionHistoryLimit 
.Values.revisionHistoryLimit }}
-{{- $securityContext := include "airflowPodSecurityContext" (list . 
.Values.workers) }}
-{{- $containerSecurityContext := include "containerSecurityContext" (list . 
.Values.workers) }}
-{{- $containerSecurityContextPersistence := include "containerSecurityContext" 
(list . .Values.workers.persistence) }}
-{{- $containerSecurityContextWaitForMigrations := include 
"containerSecurityContext" (list . .Values.workers.waitForMigrations) }}
-{{- $containerSecurityContextLogGroomerSidecar := include 
"containerSecurityContext" (list . .Values.workers.logGroomerSidecar) }}
-{{- $containerSecurityContextKerberosSidecar := include 
"containerSecurityContext" (list . .Values.workers.kerberosSidecar) }}
-{{- $containerLifecycleHooks := or .Values.workers.containerLifecycleHooks 
.Values.containerLifecycleHooks }}
-{{- $containerLifecycleHooksLogGroomerSidecar := or 
.Values.workers.logGroomerSidecar.containerLifecycleHooks 
.Values.containerLifecycleHooks }}
-{{- $containerLifecycleHooksKerberosSidecar := or 
.Values.workers.kerberosSidecar.containerLifecycleHooks 
.Values.containerLifecycleHooks }}
-{{- $safeToEvict := dict "cluster-autoscaler.kubernetes.io/safe-to-evict" 
(.Values.workers.safeToEvict | toString) }}
-{{- $podAnnotations := mergeOverwrite (deepCopy .Values.airflowPodAnnotations) 
$safeToEvict .Values.workers.podAnnotations }}
-{{- $schedulerName := or .Values.workers.schedulerName .Values.schedulerName }}
+
+{{- /* Build worker groups list: use celeryQueueGroups if defined, otherwise 
create default worker */ -}}
+{{- $workerGroups := list }}
+{{- if .Values.workers.celery.celeryQueueGroups }}
+  {{- $workerGroups = .Values.workers.celery.celeryQueueGroups }}
+{{- else }}
+  {{- $defaultWorker := dict
+    "name" ""
+    "queues" ""
+    "replicas" .Values.workers.replicas
+    "nodeSelector" (or .Values.workers.nodeSelector .Values.nodeSelector)
+    "tolerations" (or .Values.workers.tolerations .Values.tolerations)
+    "resources" .Values.workers.resources
+    "labels" .Values.workers.labels
+    "podAnnotations" .Values.workers.podAnnotations
+    "priorityClassName" .Values.workers.priorityClassName
+    "env" .Values.workers.env }}
+  {{- $workerGroups = list $defaultWorker }}
+{{- end }}
+
+{{- /* Loop through all worker groups (either celeryQueueGroups or default) */ 
-}}
+{{- range $groupIndex, $workerGroup := $workerGroups }}
+{{- if $groupIndex }}
+---
+{{- end }}
+
+{{- /* Set variables based on whether this is default worker or custom group 
*/ -}}
+{{- $isDefaultWorker := eq $workerGroup.name "" }}
+{{- $workerName := ternary "worker" (printf "worker-%s" $workerGroup.name) 
$isDefaultWorker }}
+{{- $groupQueues := $workerGroup.queues }}
+{{- $groupReplicas := $workerGroup.replicas | default 1 }}
+{{- $groupNodeSelector := $workerGroup.nodeSelector | default dict }}
+{{- $groupTolerations := $workerGroup.tolerations | default list }}
+{{- $groupResources := $workerGroup.resources | default dict }}
+{{- $groupLabels := $workerGroup.labels | default dict }}
+{{- $groupPodAnnotations := $workerGroup.podAnnotations | default dict }}
+{{- $groupPriorityClassName := $workerGroup.priorityClassName | default "" }}
+{{- $groupEnv := $workerGroup.env | default list }}
+
+{{- /* Standard variables (same for all workers) */ -}}
+{{- $nodeSelector := or $groupNodeSelector $.Values.nodeSelector }}
+{{- $affinity := or $.Values.workers.affinity $.Values.affinity }}
+{{- $tolerations := or $groupTolerations $.Values.tolerations }}
+{{- $topologySpreadConstraints := or 
$.Values.workers.topologySpreadConstraints $.Values.topologySpreadConstraints }}
+{{- $revisionHistoryLimit := or $.Values.workers.revisionHistoryLimit 
$.Values.revisionHistoryLimit }}
+{{- $securityContext := include "airflowPodSecurityContext" (list $ 
$.Values.workers) }}
+{{- $containerSecurityContext := include "containerSecurityContext" (list $ 
$.Values.workers) }}
+{{- $containerSecurityContextPersistence := include "containerSecurityContext" 
(list $ $.Values.workers.persistence) }}
+{{- $containerSecurityContextWaitForMigrations := include 
"containerSecurityContext" (list $ $.Values.workers.waitForMigrations) }}
+{{- $containerSecurityContextLogGroomerSidecar := include 
"containerSecurityContext" (list $ $.Values.workers.logGroomerSidecar) }}
+{{- $containerSecurityContextKerberosSidecar := include 
"containerSecurityContext" (list $ $.Values.workers.kerberosSidecar) }}
+{{- $containerLifecycleHooks := or $.Values.workers.containerLifecycleHooks 
$.Values.containerLifecycleHooks }}
+{{- $containerLifecycleHooksLogGroomerSidecar := or 
$.Values.workers.logGroomerSidecar.containerLifecycleHooks 
$.Values.containerLifecycleHooks }}
+{{- $containerLifecycleHooksKerberosSidecar := or 
$.Values.workers.kerberosSidecar.containerLifecycleHooks 
$.Values.containerLifecycleHooks }}
+{{- $safeToEvict := dict "cluster-autoscaler.kubernetes.io/safe-to-evict" 
($.Values.workers.safeToEvict | toString) }}
+{{- $mergedPodAnnotations := mergeOverwrite (deepCopy 
$.Values.airflowPodAnnotations) $safeToEvict $groupPodAnnotations }}
+{{- $schedulerName := or $.Values.workers.schedulerName $.Values.schedulerName 
}}
 apiVersion: apps/v1
 kind: {{ if $persistence }}StatefulSet{{ else }}Deployment{{ end }}
 metadata:
-  name: {{ include "airflow.fullname" . }}-worker
+  name: {{ include "airflow.fullname" $ }}-{{ $workerName }}
   labels:
     tier: airflow
     component: worker
-    release: {{ .Release.Name }}
-    chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
-    heritage: {{ .Release.Service }}
-    {{- with .Values.labels }}
+    {{- if not $isDefaultWorker }}
+    worker-group: {{ $workerGroup.name }}
+    {{- end }}
+    release: {{ $.Release.Name }}
+    chart: "{{ $.Chart.Name }}-{{ $.Chart.Version }}"
+    heritage: {{ $.Release.Service }}
+    {{- with $.Values.labels }}
       {{- toYaml . | nindent 4 }}
     {{- end }}
-  {{- if .Values.workers.annotations }}
-  annotations: {{- toYaml .Values.workers.annotations | nindent 4 }}
+  {{- if $.Values.workers.annotations }}
+  annotations: {{- toYaml $.Values.workers.annotations | nindent 4 }}
   {{- end }}
 spec:
   {{- if $persistence }}
-  serviceName: {{ include "airflow.fullname" . }}-worker
+  serviceName: {{ include "airflow.fullname" $ }}-{{ $workerName }}
   {{- end }}
-  {{- if and (not $keda) (not $hpa) }}
-  replicas: {{ .Values.workers.replicas }}
+  {{- if not $isDefaultWorker }}
+  replicas: {{ $groupReplicas }}
+  {{- else if and (not $keda) (not $hpa) }}
+  replicas: {{ $.Values.workers.replicas }}

Review Comment:
   So then to assume: If worker groups are defined they do not use 
auto-scaling. Can you add this to the notes/docs/schema so that users who want 
to use it are aware about the limitation?
   
   I agree that additional complexity would be needed as you would need a KEDA 
query / HPA definition then per worker group. But this would be a meaningful 
extension in my view.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to