ashb commented on code in PR #56589:
URL: https://github.com/apache/airflow/pull/56589#discussion_r2447505486
##########
chart/templates/workers/worker-deployment.yaml:
##########
@@ -24,93 +24,146 @@
{{- $keda := .Values.workers.keda.enabled }}
{{- $hpa := and .Values.workers.hpa.enabled (not .Values.workers.keda.enabled)
}}
{{- if or (contains "CeleryExecutor" .Values.executor) (contains
"CeleryKubernetesExecutor" .Values.executor) }}
-{{- $nodeSelector := or .Values.workers.nodeSelector .Values.nodeSelector }}
-{{- $affinity := or .Values.workers.affinity .Values.affinity }}
-{{- $tolerations := or .Values.workers.tolerations .Values.tolerations }}
-{{- $topologySpreadConstraints := or .Values.workers.topologySpreadConstraints
.Values.topologySpreadConstraints }}
-{{- $revisionHistoryLimit := or .Values.workers.revisionHistoryLimit
.Values.revisionHistoryLimit }}
-{{- $securityContext := include "airflowPodSecurityContext" (list .
.Values.workers) }}
-{{- $containerSecurityContext := include "containerSecurityContext" (list .
.Values.workers) }}
-{{- $containerSecurityContextPersistence := include "containerSecurityContext"
(list . .Values.workers.persistence) }}
-{{- $containerSecurityContextWaitForMigrations := include
"containerSecurityContext" (list . .Values.workers.waitForMigrations) }}
-{{- $containerSecurityContextLogGroomerSidecar := include
"containerSecurityContext" (list . .Values.workers.logGroomerSidecar) }}
-{{- $containerSecurityContextKerberosSidecar := include
"containerSecurityContext" (list . .Values.workers.kerberosSidecar) }}
-{{- $containerLifecycleHooks := or .Values.workers.containerLifecycleHooks
.Values.containerLifecycleHooks }}
-{{- $containerLifecycleHooksLogGroomerSidecar := or
.Values.workers.logGroomerSidecar.containerLifecycleHooks
.Values.containerLifecycleHooks }}
-{{- $containerLifecycleHooksKerberosSidecar := or
.Values.workers.kerberosSidecar.containerLifecycleHooks
.Values.containerLifecycleHooks }}
-{{- $safeToEvict := dict "cluster-autoscaler.kubernetes.io/safe-to-evict"
(.Values.workers.safeToEvict | toString) }}
-{{- $podAnnotations := mergeOverwrite (deepCopy .Values.airflowPodAnnotations)
$safeToEvict .Values.workers.podAnnotations }}
-{{- $schedulerName := or .Values.workers.schedulerName .Values.schedulerName }}
+
+{{- /* Build worker groups list: use celeryQueueGroups if defined, otherwise
create default worker */ -}}
+{{- $workerGroups := list }}
+{{- if .Values.workers.celery.celeryQueueGroups }}
+ {{- $workerGroups = .Values.workers.celery.celeryQueueGroups }}
+{{- else }}
+ {{- $defaultWorker := dict
+ "name" ""
+ "queues" ""
+ "replicas" .Values.workers.replicas
+ "nodeSelector" (or .Values.workers.nodeSelector .Values.nodeSelector)
+ "tolerations" (or .Values.workers.tolerations .Values.tolerations)
+ "resources" .Values.workers.resources
+ "labels" .Values.workers.labels
+ "podAnnotations" .Values.workers.podAnnotations
+ "priorityClassName" .Values.workers.priorityClassName
+ "env" .Values.workers.env }}
+ {{- $workerGroups = list $defaultWorker }}
+{{- end }}
+
+{{- /* Loop through all worker groups (either celeryQueueGroups or default) */
-}}
+{{- range $groupIndex, $workerGroup := $workerGroups }}
+{{- if $groupIndex }}
+---
+{{- end }}
+
+{{- /* Set variables based on whether this is default worker or custom group
*/ -}}
+{{- $isDefaultWorker := eq $workerGroup.name "" }}
+{{- $workerName := ternary "worker" (printf "worker-%s" $workerGroup.name)
$isDefaultWorker }}
+{{- $groupQueues := $workerGroup.queues }}
+{{- $groupReplicas := $workerGroup.replicas | default 1 }}
+{{- $groupNodeSelector := $workerGroup.nodeSelector | default dict }}
+{{- $groupTolerations := $workerGroup.tolerations | default list }}
+{{- $groupResources := $workerGroup.resources | default dict }}
+{{- $groupLabels := $workerGroup.labels | default dict }}
+{{- $groupPodAnnotations := $workerGroup.podAnnotations | default dict }}
+{{- $groupPriorityClassName := $workerGroup.priorityClassName | default "" }}
+{{- $groupEnv := $workerGroup.env | default list }}
+
+{{- /* Standard variables (same for all workers) */ -}}
+{{- $nodeSelector := or $groupNodeSelector $.Values.nodeSelector }}
+{{- $affinity := or $.Values.workers.affinity $.Values.affinity }}
+{{- $tolerations := or $groupTolerations $.Values.tolerations }}
+{{- $topologySpreadConstraints := or
$.Values.workers.topologySpreadConstraints $.Values.topologySpreadConstraints }}
+{{- $revisionHistoryLimit := or $.Values.workers.revisionHistoryLimit
$.Values.revisionHistoryLimit }}
+{{- $securityContext := include "airflowPodSecurityContext" (list $
$.Values.workers) }}
+{{- $containerSecurityContext := include "containerSecurityContext" (list $
$.Values.workers) }}
+{{- $containerSecurityContextPersistence := include "containerSecurityContext"
(list $ $.Values.workers.persistence) }}
+{{- $containerSecurityContextWaitForMigrations := include
"containerSecurityContext" (list $ $.Values.workers.waitForMigrations) }}
+{{- $containerSecurityContextLogGroomerSidecar := include
"containerSecurityContext" (list $ $.Values.workers.logGroomerSidecar) }}
+{{- $containerSecurityContextKerberosSidecar := include
"containerSecurityContext" (list $ $.Values.workers.kerberosSidecar) }}
+{{- $containerLifecycleHooks := or $.Values.workers.containerLifecycleHooks
$.Values.containerLifecycleHooks }}
+{{- $containerLifecycleHooksLogGroomerSidecar := or
$.Values.workers.logGroomerSidecar.containerLifecycleHooks
$.Values.containerLifecycleHooks }}
+{{- $containerLifecycleHooksKerberosSidecar := or
$.Values.workers.kerberosSidecar.containerLifecycleHooks
$.Values.containerLifecycleHooks }}
+{{- $safeToEvict := dict "cluster-autoscaler.kubernetes.io/safe-to-evict"
($.Values.workers.safeToEvict | toString) }}
+{{- $mergedPodAnnotations := mergeOverwrite (deepCopy
$.Values.airflowPodAnnotations) $safeToEvict $groupPodAnnotations }}
+{{- $schedulerName := or $.Values.workers.schedulerName $.Values.schedulerName
}}
apiVersion: apps/v1
kind: {{ if $persistence }}StatefulSet{{ else }}Deployment{{ end }}
metadata:
- name: {{ include "airflow.fullname" . }}-worker
+ name: {{ include "airflow.fullname" $ }}-{{ $workerName }}
labels:
tier: airflow
component: worker
- release: {{ .Release.Name }}
- chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
- heritage: {{ .Release.Service }}
- {{- with .Values.labels }}
+ {{- if not $isDefaultWorker }}
+ worker-group: {{ $workerGroup.name }}
+ {{- end }}
+ release: {{ $.Release.Name }}
+ chart: "{{ $.Chart.Name }}-{{ $.Chart.Version }}"
+ heritage: {{ $.Release.Service }}
+ {{- with $.Values.labels }}
{{- toYaml . | nindent 4 }}
{{- end }}
- {{- if .Values.workers.annotations }}
- annotations: {{- toYaml .Values.workers.annotations | nindent 4 }}
+ {{- if $.Values.workers.annotations }}
+ annotations: {{- toYaml $.Values.workers.annotations | nindent 4 }}
{{- end }}
spec:
{{- if $persistence }}
- serviceName: {{ include "airflow.fullname" . }}-worker
+ serviceName: {{ include "airflow.fullname" $ }}-{{ $workerName }}
{{- end }}
- {{- if and (not $keda) (not $hpa) }}
- replicas: {{ .Values.workers.replicas }}
+ {{- if not $isDefaultWorker }}
+ replicas: {{ $groupReplicas }}
+ {{- else if and (not $keda) (not $hpa) }}
+ replicas: {{ $.Values.workers.replicas }}
Review Comment:
Why does keda not apply to the worker groups?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]