HsiuChuanHsu commented on code in PR #56589:
URL: https://github.com/apache/airflow/pull/56589#discussion_r2448731706


##########
chart/templates/workers/worker-deployment.yaml:
##########
@@ -24,93 +24,146 @@
 {{- $keda := .Values.workers.keda.enabled }}
 {{- $hpa := and .Values.workers.hpa.enabled (not .Values.workers.keda.enabled) 
}}
 {{- if or (contains "CeleryExecutor" .Values.executor) (contains 
"CeleryKubernetesExecutor" .Values.executor) }}
-{{- $nodeSelector := or .Values.workers.nodeSelector .Values.nodeSelector }}
-{{- $affinity := or .Values.workers.affinity .Values.affinity }}
-{{- $tolerations := or .Values.workers.tolerations .Values.tolerations }}
-{{- $topologySpreadConstraints := or .Values.workers.topologySpreadConstraints 
.Values.topologySpreadConstraints }}
-{{- $revisionHistoryLimit := or .Values.workers.revisionHistoryLimit 
.Values.revisionHistoryLimit }}
-{{- $securityContext := include "airflowPodSecurityContext" (list . 
.Values.workers) }}
-{{- $containerSecurityContext := include "containerSecurityContext" (list . 
.Values.workers) }}
-{{- $containerSecurityContextPersistence := include "containerSecurityContext" 
(list . .Values.workers.persistence) }}
-{{- $containerSecurityContextWaitForMigrations := include 
"containerSecurityContext" (list . .Values.workers.waitForMigrations) }}
-{{- $containerSecurityContextLogGroomerSidecar := include 
"containerSecurityContext" (list . .Values.workers.logGroomerSidecar) }}
-{{- $containerSecurityContextKerberosSidecar := include 
"containerSecurityContext" (list . .Values.workers.kerberosSidecar) }}
-{{- $containerLifecycleHooks := or .Values.workers.containerLifecycleHooks 
.Values.containerLifecycleHooks }}
-{{- $containerLifecycleHooksLogGroomerSidecar := or 
.Values.workers.logGroomerSidecar.containerLifecycleHooks 
.Values.containerLifecycleHooks }}
-{{- $containerLifecycleHooksKerberosSidecar := or 
.Values.workers.kerberosSidecar.containerLifecycleHooks 
.Values.containerLifecycleHooks }}
-{{- $safeToEvict := dict "cluster-autoscaler.kubernetes.io/safe-to-evict" 
(.Values.workers.safeToEvict | toString) }}
-{{- $podAnnotations := mergeOverwrite (deepCopy .Values.airflowPodAnnotations) 
$safeToEvict .Values.workers.podAnnotations }}
-{{- $schedulerName := or .Values.workers.schedulerName .Values.schedulerName }}
+
+{{- /* Build worker groups list: use celeryQueueGroups if defined, otherwise 
create default worker */ -}}
+{{- $workerGroups := list }}
+{{- if .Values.workers.celery.celeryQueueGroups }}
+  {{- $workerGroups = .Values.workers.celery.celeryQueueGroups }}
+{{- else }}
+  {{- $defaultWorker := dict
+    "name" ""
+    "queues" ""
+    "replicas" .Values.workers.replicas
+    "nodeSelector" (or .Values.workers.nodeSelector .Values.nodeSelector)
+    "tolerations" (or .Values.workers.tolerations .Values.tolerations)
+    "resources" .Values.workers.resources
+    "labels" .Values.workers.labels
+    "podAnnotations" .Values.workers.podAnnotations
+    "priorityClassName" .Values.workers.priorityClassName
+    "env" .Values.workers.env }}
+  {{- $workerGroups = list $defaultWorker }}
+{{- end }}
+
+{{- /* Loop through all worker groups (either celeryQueueGroups or default) */ 
-}}
+{{- range $groupIndex, $workerGroup := $workerGroups }}
+{{- if $groupIndex }}
+---
+{{- end }}
+
+{{- /* Set variables based on whether this is default worker or custom group 
*/ -}}
+{{- $isDefaultWorker := eq $workerGroup.name "" }}
+{{- $workerName := ternary "worker" (printf "worker-%s" $workerGroup.name) 
$isDefaultWorker }}
+{{- $groupQueues := $workerGroup.queues }}
+{{- $groupReplicas := $workerGroup.replicas | default 1 }}
+{{- $groupNodeSelector := $workerGroup.nodeSelector | default dict }}
+{{- $groupTolerations := $workerGroup.tolerations | default list }}
+{{- $groupResources := $workerGroup.resources | default dict }}
+{{- $groupLabels := $workerGroup.labels | default dict }}
+{{- $groupPodAnnotations := $workerGroup.podAnnotations | default dict }}
+{{- $groupPriorityClassName := $workerGroup.priorityClassName | default "" }}
+{{- $groupEnv := $workerGroup.env | default list }}
+
+{{- /* Standard variables (same for all workers) */ -}}
+{{- $nodeSelector := or $groupNodeSelector $.Values.nodeSelector }}
+{{- $affinity := or $.Values.workers.affinity $.Values.affinity }}
+{{- $tolerations := or $groupTolerations $.Values.tolerations }}
+{{- $topologySpreadConstraints := or 
$.Values.workers.topologySpreadConstraints $.Values.topologySpreadConstraints }}
+{{- $revisionHistoryLimit := or $.Values.workers.revisionHistoryLimit 
$.Values.revisionHistoryLimit }}
+{{- $securityContext := include "airflowPodSecurityContext" (list $ 
$.Values.workers) }}
+{{- $containerSecurityContext := include "containerSecurityContext" (list $ 
$.Values.workers) }}
+{{- $containerSecurityContextPersistence := include "containerSecurityContext" 
(list $ $.Values.workers.persistence) }}
+{{- $containerSecurityContextWaitForMigrations := include 
"containerSecurityContext" (list $ $.Values.workers.waitForMigrations) }}
+{{- $containerSecurityContextLogGroomerSidecar := include 
"containerSecurityContext" (list $ $.Values.workers.logGroomerSidecar) }}
+{{- $containerSecurityContextKerberosSidecar := include 
"containerSecurityContext" (list $ $.Values.workers.kerberosSidecar) }}
+{{- $containerLifecycleHooks := or $.Values.workers.containerLifecycleHooks 
$.Values.containerLifecycleHooks }}
+{{- $containerLifecycleHooksLogGroomerSidecar := or 
$.Values.workers.logGroomerSidecar.containerLifecycleHooks 
$.Values.containerLifecycleHooks }}
+{{- $containerLifecycleHooksKerberosSidecar := or 
$.Values.workers.kerberosSidecar.containerLifecycleHooks 
$.Values.containerLifecycleHooks }}
+{{- $safeToEvict := dict "cluster-autoscaler.kubernetes.io/safe-to-evict" 
($.Values.workers.safeToEvict | toString) }}
+{{- $mergedPodAnnotations := mergeOverwrite (deepCopy 
$.Values.airflowPodAnnotations) $safeToEvict $groupPodAnnotations }}
+{{- $schedulerName := or $.Values.workers.schedulerName $.Values.schedulerName 
}}
 apiVersion: apps/v1
 kind: {{ if $persistence }}StatefulSet{{ else }}Deployment{{ end }}
 metadata:
-  name: {{ include "airflow.fullname" . }}-worker
+  name: {{ include "airflow.fullname" $ }}-{{ $workerName }}
   labels:
     tier: airflow
     component: worker
-    release: {{ .Release.Name }}
-    chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
-    heritage: {{ .Release.Service }}
-    {{- with .Values.labels }}
+    {{- if not $isDefaultWorker }}
+    worker-group: {{ $workerGroup.name }}
+    {{- end }}
+    release: {{ $.Release.Name }}
+    chart: "{{ $.Chart.Name }}-{{ $.Chart.Version }}"
+    heritage: {{ $.Release.Service }}
+    {{- with $.Values.labels }}
       {{- toYaml . | nindent 4 }}
     {{- end }}
-  {{- if .Values.workers.annotations }}
-  annotations: {{- toYaml .Values.workers.annotations | nindent 4 }}
+  {{- if $.Values.workers.annotations }}
+  annotations: {{- toYaml $.Values.workers.annotations | nindent 4 }}
   {{- end }}
 spec:
   {{- if $persistence }}
-  serviceName: {{ include "airflow.fullname" . }}-worker
+  serviceName: {{ include "airflow.fullname" $ }}-{{ $workerName }}
   {{- end }}
-  {{- if and (not $keda) (not $hpa) }}
-  replicas: {{ .Values.workers.replicas }}
+  {{- if not $isDefaultWorker }}
+  replicas: {{ $groupReplicas }}
+  {{- else if and (not $keda) (not $hpa) }}
+  replicas: {{ $.Values.workers.replicas }}

Review Comment:
   I considered combining the `KEDA` and `HPA` changes into this PR, but I 
believe this would significantly increase the review difficulty. Given the 
volume of configuration involved in setting up both `KEDA` and `HPA`, I think 
it would be a lot of work. If possible, I prefer make then into seperate PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to