ahmadfarhan97 opened a new issue, #30863:
URL: https://github.com/apache/airflow/issues/30863
### Official Helm Chart version
1.9.0 (latest released)
### Apache Airflow version
2.5.3
### Kubernetes Version
1.26.3
### Helm Chart configuration
```
fullnameOverride: ""
nameOverride: ""
kubeVersionOverride: ""
revisionHistoryLimit: ~
# User and group of airflow user
uid: 50000
gid: 0
# Default security context for airflow
securityContext: {}
# runAsUser: 50000
# fsGroup: 0
# runAsGroup: 0
# Airflow home directory
# Used for mount paths
airflowHome: /opt/airflow
# Default airflow repository -- overridden by all the specific images below
defaultAirflowRepository: airflow-custom
# Default airflow tag to deploy
defaultAirflowTag: "1.1.0"
# Airflow version (Used to make some decisions based on Airflow Version
being deployed)
airflowVersion: "2.5.3"
# Images
images:
airflow:
repository: ~
tag: ~
pullPolicy: IfNotPresent
useDefaultImageForMigration: false
# timeout (in seconds) for airflow-migrations to complete
migrationsWaitTimeout: 60
pod_template:
repository: ~
tag: ~
pullPolicy: IfNotPresent
flower:
repository: ~
tag: ~
pullPolicy: IfNotPresent
statsd:
repository: quay.io/prometheus/statsd-exporter
tag: v0.22.8
pullPolicy: IfNotPresent
redis:
repository: redis
tag: 7-bullseye
pullPolicy: IfNotPresent
pgbouncer:
repository: apache/airflow
tag: airflow-pgbouncer-2021.04.28-1.14.0
pullPolicy: IfNotPresent
pgbouncerExporter:
repository: apache/airflow
tag: airflow-pgbouncer-exporter-2021.09.22-0.12.0
pullPolicy: IfNotPresent
gitSync:
repository: k8s.gcr.io/git-sync/git-sync
tag: v3.6.3
pullPolicy: IfNotPresent
# Select certain nodes for airflow pods.
nodeSelector: {}
affinity: {}
tolerations: []
topologySpreadConstraints: []
# Add common labels to all objects and pods defined in this chart.
labels: {}
# Ingress configuration
ingress:
# Enable all ingress resources (deprecated - use ingress.web.enabled and
ingress.flower.enabled)
enabled: ~
# Configs for the Ingress of the web Service
web:
# Enable web ingress resource
enabled: false
# Annotations for the web Ingress
annotations: {}
# The path for the web Ingress
path: "/"
# The pathType for the above path (used only with Kubernetes v1.19 and
above)
pathType: "ImplementationSpecific"
# The hostname for the web Ingress (Deprecated - renamed to
`ingress.web.hosts`)
host: ""
# The hostnames or hosts configuration for the web Ingress
hosts: []
# The Ingress Class for the web Ingress (used only with Kubernetes v1.19
and above)
ingressClassName: ""
# configs for web Ingress TLS (Deprecated - renamed to
`ingress.web.hosts[*].tls`)
tls:
# Enable TLS termination for the web Ingress
enabled: false
# the name of a pre-created Secret containing a TLS private key and
certificate
secretName: ""
# HTTP paths to add to the web Ingress before the default path
precedingPaths: []
# Http paths to add to the web Ingress after the default path
succeedingPaths: []
# Configs for the Ingress of the flower Service
flower:
# Enable web ingress resource
enabled: false
# Annotations for the flower Ingress
annotations: {}
# The path for the flower Ingress
path: "/"
# The pathType for the above path (used only with Kubernetes v1.19 and
above)
pathType: "ImplementationSpecific"
# The hostname for the flower Ingress (Deprecated - renamed to
`ingress.flower.hosts`)
host: ""
# The hostnames or hosts configuration for the flower Ingress
hosts: []
# The Ingress Class for the flower Ingress (used only with Kubernetes
v1.19 and above)
ingressClassName: ""
# configs for flower Ingress TLS (Deprecated - renamed to
`ingress.flower.hosts[*].tls`)
tls:
# Enable TLS termination for the flower Ingress
enabled: false
# the name of a pre-created Secret containing a TLS private key and
certificate
secretName: ""
# Network policy configuration
networkPolicies:
# Enabled network policies
enabled: false
# Extra annotations to apply to all
# Airflow pods
airflowPodAnnotations: {}
# Extra annotations to apply to
# main Airflow configmap
airflowConfigAnnotations: {}
# `airflow_local_settings` file as a string (can be templated).
airflowLocalSettings: |-
{{- if semverCompare ">=2.2.0" .Values.airflowVersion }}
{{- if not (or .Values.webserverSecretKey
.Values.webserverSecretKeySecretName) }}
from airflow.www.utils import UIAlert
DASHBOARD_UIALERTS = [
UIAlert(
'Usage of a dynamic webserver secret key detected. We recommend a
static webserver secret key instead.'
' See the <a href='
'"https://airflow.apache.org/docs/helm-chart/stable/production-guide.html#webserver-secret-key">'
'Helm Chart Production Guide</a> for more details.',
category="warning",
roles=["Admin"],
html=True,
)
]
{{- end }}
{{- end }}
# Enable RBAC (default on most clusters these days)
rbac:
# Specifies whether RBAC resources should be created
create: true
createSCCRoleBinding: false
# Airflow executor
# One of: LocalExecutor, LocalKubernetesExecutor, CeleryExecutor,
KubernetesExecutor, CeleryKubernetesExecutor
executor: "KubernetesExecutor"
allowPodLaunching: true
# Environment variables for all airflow containers
env: []
# - name: ""
# value: ""
# Volumes for all airflow containers
volumes: []
# VolumeMounts for all airflow containers
volumeMounts: []
# Secrets for all airflow containers
secret:
- envName: "AIRFLOW_CONN_SNOWFLAKE"
secretName: "airflow-snowflake-connection"
secretKey: "AIRFLOW_CONN_SNOWFLAKE"
- envName: "AIRFLOW_CONN_ADLS"
secretName: "azure-blob-storage-secret"
secretKey: "AIRFLOW_CONN_ADLS"
enableBuiltInSecretEnvVars:
AIRFLOW__CORE__FERNET_KEY: true
# For Airflow <2.3, backward compatibility; moved to [database] in 2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: true
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: true
AIRFLOW_CONN_AIRFLOW_DB: true
AIRFLOW__WEBSERVER__SECRET_KEY: true
AIRFLOW__CELERY__CELERY_RESULT_BACKEND: true
AIRFLOW__CELERY__RESULT_BACKEND: true
AIRFLOW__CELERY__BROKER_URL: true
AIRFLOW__ELASTICSEARCH__HOST: true
AIRFLOW__ELASTICSEARCH__ELASTICSEARCH_HOST: true
extraSecrets: {}
extraConfigMaps: {}
extraEnv: ~
# Airflow database & redis config
data:
metadataSecretName: custom-airflow-metadata-secret
resultBackendSecretName: ~
brokerUrlSecretName: ~
# Otherwise pass connection values in
metadataConnection: ~
resultBackendConnection: ~
brokerUrl: ~
# Fernet key settings
# Note: fernetKey can only be set during install, not upgrade
fernetKey: ~
fernetKeySecretName: ~
# Flask secret key for Airflow Webserver: `[webserver] secret_key` in
airflow.cfg
webserverSecretKey: ~
webserverSecretKeySecretName: ~
kerberos:
enabled: false
ccacheMountPath: /var/kerberos-ccache
ccacheFileName: cache
configPath: /etc/krb5.conf
keytabBase64Content: ~
keytabPath: /etc/airflow.keytab
principal: [email protected]
reinitFrequency: 3600
config: |
[logging]
default = "FILE:{{ template "airflow_logs_no_quote" .
}}/kerberos_libs.log"
kdc = "FILE:{{ template "airflow_logs_no_quote" . }}/kerberos_kdc.log"
admin_server = "FILE:{{ template "airflow_logs_no_quote" .
}}/kadmind.log"
[libdefaults]
default_realm = FOO.COM
ticket_lifetime = 10h
renew_lifetime = 7d
forwardable = true
[realms]
FOO.COM = {
kdc = kdc-server.foo.com
admin_server = admin_server.foo.com
}
# Airflow Worker Config
workers:
# Number of airflow celery workers in StatefulSet
replicas: 1
# Max number of old replicasets to retain
revisionHistoryLimit: ~
# Command to use when running Airflow workers (templated).
command: ~
# Args to use when running Airflow workers (templated).
args:
- "bash"
- "-c"
# The format below is necessary to get `helm lint` happy
- |-
exec \
airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary
"celery worker" "worker" }}
# If the worker stops responding for 5 minutes (5*60s) kill the
# worker and let Kubernetes restart it
livenessProbe:
enabled: true
initialDelaySeconds: 10
timeoutSeconds: 20
failureThreshold: 5
periodSeconds: 60
command: ~
# Update Strategy when worker is deployed as a StatefulSet
updateStrategy: ~
# Update Strategy when worker is deployed as a Deployment
strategy:
rollingUpdate:
maxSurge: "100%"
maxUnavailable: "50%"
# When not set, the values defined in the global securityContext will be
used
securityContext: {}
# Create ServiceAccount
serviceAccount:
# Specifies whether a ServiceAccount should be created
create: true
# The name of the ServiceAccount to use.
# If not set and create is true, a name is generated using the release
name
name: ~
# Annotations to add to worker kubernetes service account.
annotations: {}
# Allow KEDA autoscaling.
# Persistence.enabled must be set to false to use KEDA.
keda:
enabled: false
namespaceLabels: {}
# How often KEDA polls the airflow DB to report new scale requests to
the HPA
pollingInterval: 5
# How many seconds KEDA will wait before scaling to zero.
# Note that HPA has a separate cooldown period for scale-downs
cooldownPeriod: 30
# Minimum number of workers created by keda
minReplicaCount: 0
# Maximum number of workers created by keda
maxReplicaCount: 10
# Specify HPA related options
advanced: {}
# horizontalPodAutoscalerConfig:
# behavior:
# scaleDown:
# stabilizationWindowSeconds: 300
# policies:
# - type: Percent
# value: 100
# periodSeconds: 15
persistence:
# Enable persistent volumes
enabled: true
# Volume size for worker StatefulSet
size: 100Gi
# If using a custom storageClass, pass name ref to all statefulSets here
storageClassName:
# Execute init container to chown log directory.
# This is currently only needed in kind, due to usage
# of local-path provisioner.
fixPermissions: false
# Annotations to add to worker volumes
annotations: {}
kerberosSidecar:
# Enable kerberos sidecar
enabled: false
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
# Grace period for tasks to finish after SIGTERM is sent from kubernetes
terminationGracePeriodSeconds: 600
# This setting tells kubernetes that its ok to evict
# when it wants to scale a node down.
safeToEvict: true
# Launch additional containers into worker.
# Note: If used with KubernetesExecutor, you are responsible for signaling
sidecars to exit when the main
# container finishes so Airflow can continue the worker shutdown process!
extraContainers: []
# Add additional init containers into workers.
extraInitContainers: []
# Mount additional volumes into worker.
extraVolumes: []
extraVolumeMounts: []
# Select certain nodes for airflow worker pods.
nodeSelector: {}
priorityClassName: ~
affinity: {}
tolerations: []
topologySpreadConstraints: []
# hostAliases to use in worker pods.
# See:
#
https://kubernetes.io/docs/concepts/services-networking/add-entries-to-pod-etc-hosts-with-host-aliases/
hostAliases: []
# - ip: "127.0.0.2"
# hostnames:
# - "test.hostname.one"
# - ip: "127.0.0.3"
# hostnames:
# - "test.hostname.two"
# annotations for the worker resource
annotations: {}
podAnnotations: {}
# Labels specific to workers objects and pods
labels: {}
logGroomerSidecar:
# Whether to deploy the Airflow worker log groomer sidecar.
enabled: true
# Command to use when running the Airflow worker log groomer sidecar
(templated).
command: ~
# Args to use when running the Airflow worker log groomer sidecar
(templated).
args: ["bash", "/clean-logs"]
# Number of days to retain logs
retentionDays: 15
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
waitForMigrations:
env: []
env: []
# Airflow scheduler settings
scheduler:
# If the scheduler stops heartbeating for 5 minutes (5*60s) kill the
# scheduler and let Kubernetes restart it
livenessProbe:
initialDelaySeconds: 10
timeoutSeconds: 20
failureThreshold: 5
periodSeconds: 60
command: ~
# Airflow 2.0 allows users to run multiple schedulers,
# However this feature is only recommended for MySQL 8+ and Postgres
replicas: 1
# Max number of old replicasets to retain
revisionHistoryLimit: ~
# Command to use when running the Airflow scheduler (templated).
command: ~
# Args to use when running the Airflow scheduler (templated).
args: ["bash", "-c", "exec airflow scheduler"]
# Update Strategy when scheduler is deployed as a StatefulSet
# (when using LocalExecutor and workers.persistence)
updateStrategy: ~
# Update Strategy when scheduler is deployed as a Deployment
# (when not using LocalExecutor and workers.persistence)
strategy: ~
# When not set, the values defined in the global securityContext will be
used
securityContext: {}
# runAsUser: 50000
# fsGroup: 0
# runAsGroup: 0
# Create ServiceAccount
serviceAccount:
# Specifies whether a ServiceAccount should be created
create: true
# The name of the ServiceAccount to use.
# If not set and create is true, a name is generated using the release
name
name: ~
# Annotations to add to scheduler kubernetes service account.
annotations: {}
# Scheduler pod disruption budget
podDisruptionBudget:
enabled: false
# PDB configuration
config:
maxUnavailable: 1
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
# This setting tells kubernetes that its ok to evict
# when it wants to scale a node down.
safeToEvict: true
# Launch additional containers into scheduler.
extraContainers: []
# Add additional init containers into scheduler.
extraInitContainers: []
# Mount additional volumes into scheduler.
extraVolumes: []
extraVolumeMounts: []
# Select certain nodes for airflow scheduler pods.
nodeSelector: {}
affinity: {}
tolerations: []
topologySpreadConstraints: []
priorityClassName: ~
# annotations for scheduler deployment
annotations: {}
podAnnotations: {}
# Labels specific to scheduler objects and pods
labels: {}
logGroomerSidecar:
# Whether to deploy the Airflow scheduler log groomer sidecar.
enabled: true
# Command to use when running the Airflow scheduler log groomer sidecar
(templated).
command: ~
# Args to use when running the Airflow scheduler log groomer sidecar
(templated).
args: ["bash", "/clean-logs"]
# Number of days to retain logs
retentionDays: 15
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
waitForMigrations:
# Whether to create init container to wait for db migrations
enabled: true
env: []
env: []
# Airflow create user job settings
createUserJob:
# Command to use when running the create user job (templated).
command: ~
# Args to use when running the create user job (templated).
args:
- "bash"
- "-c"
# The format below is necessary to get `helm lint` happy
- |-
exec \
airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary
"users create" "create_user" }} "$@"
- --
- "-r"
- "{{ .Values.webserver.defaultUser.role }}"
- "-u"
- "{{ .Values.webserver.defaultUser.username }}"
- "-e"
- "{{ .Values.webserver.defaultUser.email }}"
- "-f"
- "{{ .Values.webserver.defaultUser.firstName }}"
- "-l"
- "{{ .Values.webserver.defaultUser.lastName }}"
- "-p"
- "{{ .Values.webserver.defaultUser.password }}"
# Annotations on the create user job pod
annotations: {}
# jobAnnotations are annotations on the create user job
jobAnnotations: {}
# Labels specific to createUserJob objects and pods
labels: {}
# When not set, the values defined in the global securityContext will be
used
securityContext: {}
# runAsUser: 50000
# fsGroup: 0
# runAsGroup: 0
# Create ServiceAccount
serviceAccount:
# Specifies whether a ServiceAccount should be created
create: true
# The name of the ServiceAccount to use.
# If not set and create is true, a name is generated using the release
name
name: ~
# Annotations to add to create user kubernetes service account.
annotations: {}
# Launch additional containers into user creation job
extraContainers: []
# Mount additional volumes into user creation job
extraVolumes: []
extraVolumeMounts: []
nodeSelector: {}
affinity: {}
tolerations: []
topologySpreadConstraints: []
# In case you need to disable the helm hooks that create the jobs after
install.
# Disable this if you are using ArgoCD for example
useHelmHooks: true
applyCustomEnv: true
env: []
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
# Airflow database migration job settings
migrateDatabaseJob:
enabled: true
# Command to use when running the migrate database job (templated).
command: ~
# Args to use when running the migrate database job (templated).
args:
- "bash"
- "-c"
# The format below is necessary to get `helm lint` happy
- |-
exec \
airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary
"db upgrade" "upgradedb" }}
# Annotations on the database migration pod
annotations: {}
# jobAnnotations are annotations on the database migration job
jobAnnotations: {}
# When not set, the values defined in the global securityContext will be
used
securityContext: {}
# runAsUser: 50000
# fsGroup: 0
# runAsGroup: 0
# Create ServiceAccount
serviceAccount:
# Specifies whether a ServiceAccount should be created
create: true
# The name of the ServiceAccount to use.
# If not set and create is true, a name is generated using the release
name
name: ~
# Annotations to add to migrate database job kubernetes service account.
annotations: {}
resources: {}
# Launch additional containers into database migration job
extraContainers: []
# Mount additional volumes into database migration job
extraVolumes: []
extraVolumeMounts: []
nodeSelector: {}
affinity: {}
tolerations: []
topologySpreadConstraints: []
# In case you need to disable the helm hooks that create the jobs after
install.
# Disable this if you are using ArgoCD for example
useHelmHooks: true
applyCustomEnv: true
# Airflow webserver settings
webserver:
allowPodLogReading: true
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 30
failureThreshold: 20
periodSeconds: 5
scheme: HTTP
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 30
failureThreshold: 20
periodSeconds: 5
scheme: HTTP
# Number of webservers
replicas: 1
# Max number of old replicasets to retain
revisionHistoryLimit: ~
# Command to use when running the Airflow webserver (templated).
command: ~
# Args to use when running the Airflow webserver (templated).
args: ["bash", "-c", "exec airflow webserver"]
# Create ServiceAccount
serviceAccount:
# Specifies whether a ServiceAccount should be created
create: true
# The name of the ServiceAccount to use.
# If not set and create is true, a name is generated using the release
name
name: ~
# Annotations to add to webserver kubernetes service account.
annotations: {}
# Webserver pod disruption budget
podDisruptionBudget:
enabled: false
# PDB configuration
config:
maxUnavailable: 1
# Allow overriding Update Strategy for Webserver
strategy: ~
# When not set, the values defined in the global securityContext will be
used
securityContext: {}
# runAsUser: 50000
# fsGroup: 0
# runAsGroup: 0
# Additional network policies as needed (Deprecated - renamed to
`webserver.networkPolicy.ingress.from`)
extraNetworkPolicies: []
networkPolicy:
ingress:
# Peers for webserver NetworkPolicy ingress
from: []
# Ports for webserver NetworkPolicy ingress (if `from` is set)
ports:
- port: "{{ .Values.ports.airflowUI }}"
resources: {}
# Create initial user.
defaultUser:
enabled: true
role: Admin
username: admin
email: [email protected]
firstName: admin
lastName: user
password: admin
# Launch additional containers into webserver.
extraContainers: []
# Add additional init containers into webserver.
extraInitContainers: []
# Mount additional volumes into webserver.
extraVolumes: []
extraVolumeMounts: []
# This string (can be templated) will be mounted into the Airflow Webserver
# as a custom webserver_config.py. You can bake a webserver_config.py in to
# your image instead or specify a configmap containing the
# webserver_config.py.
webserverConfig: ~
# webserverConfig: |
# from airflow import configuration as conf
# # The SQLAlchemy connection string.
# SQLALCHEMY_DATABASE_URI = conf.get('database', 'SQL_ALCHEMY_CONN')
# # Flask-WTF flag for CSRF
# CSRF_ENABLED = True
webserverConfigConfigMapName: ~
service:
type: ClusterIP
## service annotations
annotations: {}
ports:
- name: airflow-ui
port: "{{ .Values.ports.airflowUI }}"
loadBalancerIP: ~
## Limit load balancer source ips to list of CIDRs
# loadBalancerSourceRanges:
# - "10.123.0.0/16"
loadBalancerSourceRanges: []
# Select certain nodes for airflow webserver pods.
nodeSelector: {}
priorityClassName: ~
affinity: {}
tolerations: []
topologySpreadConstraints: []
# annotations for webserver deployment
annotations: {}
podAnnotations: {}
# Labels specific webserver app
labels: {}
waitForMigrations:
# Whether to create init container to wait for db migrations
enabled: true
env: []
env: []
# Airflow Triggerer Config
triggerer:
enabled: true
# Number of airflow triggerers in the deployment
replicas: 1
# Max number of old replicasets to retain
revisionHistoryLimit: ~
# Command to use when running Airflow triggerers (templated).
command: ~
# Args to use when running Airflow triggerer (templated).
args: ["bash", "-c", "exec airflow triggerer"]
# Update Strategy for triggerers
strategy:
rollingUpdate:
maxSurge: "100%"
maxUnavailable: "50%"
# If the triggerer stops heartbeating for 5 minutes (5*60s) kill the
# triggerer and let Kubernetes restart it
livenessProbe:
initialDelaySeconds: 10
timeoutSeconds: 20
failureThreshold: 5
periodSeconds: 60
command: ~
# Create ServiceAccount
serviceAccount:
# Specifies whether a ServiceAccount should be created
create: true
# The name of the ServiceAccount to use.
# If not set and create is true, a name is generated using the release
name
name: ~
# Annotations to add to triggerer kubernetes service account.
annotations: {}
# When not set, the values defined in the global securityContext will be
used
securityContext: {}
# runAsUser: 50000
# fsGroup: 0
# runAsGroup: 0
resources: {}
# Grace period for triggerer to finish after SIGTERM is sent from
kubernetes
terminationGracePeriodSeconds: 60
# This setting tells kubernetes that its ok to evict
# when it wants to scale a node down.
safeToEvict: true
# Launch additional containers into triggerer.
extraContainers: []
# Add additional init containers into triggerers.
extraInitContainers: []
# Mount additional volumes into triggerer.
extraVolumes: []
extraVolumeMounts: []
# Select certain nodes for airflow triggerer pods.
nodeSelector: {}
affinity: {}
tolerations: []
topologySpreadConstraints: []
priorityClassName: ~
# annotations for the triggerer deployment
annotations: {}
podAnnotations: {}
# Labels specific to triggerer objects and pods
labels: {}
waitForMigrations:
# Whether to create init container to wait for db migrations
enabled: true
env: []
env: []
# Airflow Dag Processor Config
dagProcessor:
enabled: false
# Number of airflow dag processors in the deployment
replicas: 1
# Max number of old replicasets to retain
revisionHistoryLimit: ~
# Command to use when running Airflow dag processors (templated).
command: ~
# Args to use when running Airflow dag processor (templated).
args: ["bash", "-c", "exec airflow dag-processor"]
# Update Strategy for dag processors
strategy:
rollingUpdate:
maxSurge: "100%"
maxUnavailable: "50%"
# If the dag processor stops heartbeating for 5 minutes (5*60s) kill the
# dag processor and let Kubernetes restart it
livenessProbe:
initialDelaySeconds: 10
timeoutSeconds: 20
failureThreshold: 5
periodSeconds: 60
command: ~
# Create ServiceAccount
serviceAccount:
# Specifies whether a ServiceAccount should be created
create: true
# The name of the ServiceAccount to use.
# If not set and create is true, a name is generated using the release
name
name: ~
# Annotations to add to dag processor kubernetes service account.
annotations: {}
# When not set, the values defined in the global securityContext will be
used
securityContext: {}
# runAsUser: 50000
# fsGroup: 0
# runAsGroup: 0
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
# Grace period for dag processor to finish after SIGTERM is sent from
kubernetes
terminationGracePeriodSeconds: 60
# This setting tells kubernetes that its ok to evict
# when it wants to scale a node down.
safeToEvict: true
# Launch additional containers into dag processor.
extraContainers: []
# Add additional init containers into dag processors.
extraInitContainers: []
# Mount additional volumes into dag processor.
extraVolumes: []
extraVolumeMounts: []
# Select certain nodes for airflow dag processor pods.
nodeSelector: {}
affinity: {}
tolerations: []
topologySpreadConstraints: []
priorityClassName: ~
# annotations for the dag processor deployment
annotations: {}
podAnnotations: {}
waitForMigrations:
# Whether to create init container to wait for db migrations
enabled: true
env: []
env: []
# Flower settings
flower:
# Enable flower.
# If True, and using CeleryExecutor/CeleryKubernetesExecutor, will deploy
flower app.
enabled: false
# Max number of old replicasets to retain
revisionHistoryLimit: ~
# Command to use when running flower (templated).
command: ~
# Args to use when running flower (templated).
args:
- "bash"
- "-c"
# The format below is necessary to get `helm lint` happy
- |-
exec \
airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary
"celery flower" "flower" }}
# Additional network policies as needed (Deprecated - renamed to
`flower.networkPolicy.ingress.from`)
extraNetworkPolicies: []
networkPolicy:
ingress:
# Peers for flower NetworkPolicy ingress
from: []
# Ports for flower NetworkPolicy ingress (if ingressPeers is set)
ports:
- port: "{{ .Values.ports.flowerUI }}"
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
# When not set, the values defined in the global securityContext will be
used
securityContext: {}
# runAsUser: 50000
# fsGroup: 0
# runAsGroup: 0
# Create ServiceAccount
serviceAccount:
# Specifies whether a ServiceAccount should be created
create: true
# The name of the ServiceAccount to use.
# If not set and create is true, a name is generated using the release
name
name: ~
# Annotations to add to worker kubernetes service account.
annotations: {}
# A secret containing the connection
secretName: ~
# Else, if username and password are set, create secret from username and
password
username: ~
password: ~
service:
type: ClusterIP
## service annotations
annotations: {}
ports:
- name: flower-ui
port: "{{ .Values.ports.flowerUI }}"
# To change the port used to access flower:
# ports:
# - name: flower-ui
# port: 8080
# targetPort: flower-ui
loadBalancerIP: ~
## Limit load balancer source ips to list of CIDRs
# loadBalancerSourceRanges:
# - "10.123.0.0/16"
loadBalancerSourceRanges: []
# Launch additional containers into the flower pods.
extraContainers: []
# Mount additional volumes into the flower pods.
extraVolumes: []
extraVolumeMounts: []
# Select certain nodes for airflow flower pods.
nodeSelector: {}
affinity: {}
tolerations: []
topologySpreadConstraints: []
priorityClassName: ~
# annotations for the flower deployment
annotations: {}
podAnnotations: {}
# Labels specific to flower objects and pods
labels: {}
env: []
# StatsD settings
statsd:
enabled: true
# Max number of old replicasets to retain
revisionHistoryLimit: ~
# Arguments for StatsD exporter command.
args: ["--statsd.mapping-config=/etc/statsd-exporter/mappings.yml"]
# Create ServiceAccount
serviceAccount:
# Specifies whether a ServiceAccount should be created
create: true
# The name of the ServiceAccount to use.
# If not set and create is true, a name is generated using the release
name
name: ~
# Annotations to add to worker kubernetes service account.
annotations: {}
uid: 65534
# When not set, `statsd.uid` will be used
securityContext: {}
# runAsUser: 65534
# fsGroup: 0
# runAsGroup: 0
# Additional network policies as needed
extraNetworkPolicies: []
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
service:
extraAnnotations: {}
# Select certain nodes for StatsD pods.
nodeSelector: {}
affinity: {}
tolerations: []
topologySpreadConstraints: []
priorityClassName: ~
# Additional mappings for StatsD exporter.
# If set, will merge default mapping and extra mappings, default mapping
has higher priority.
# So, if you want to change some default mapping, please use
`overrideMappings`
extraMappings: []
# Override mappings for StatsD exporter.
# If set, will ignore setting item in default and `extraMappings`.
# So, If you use it, ensure all mapping item contains in it.
overrideMappings: []
podAnnotations: {}
# PgBouncer settings
pgbouncer:
# Enable PgBouncer
enabled: false
# Number of PgBouncer replicas to run in Deployment
replicas: 1
# Max number of old replicasets to retain
revisionHistoryLimit: ~
# Command to use for PgBouncer(templated).
command: ["pgbouncer", "-u", "nobody", "/etc/pgbouncer/pgbouncer.ini"]
# Args to use for PgBouncer(templated).
args: ~
auth_type: md5
auth_file: /etc/pgbouncer/users.txt
# annotations to be added to the PgBouncer deployment
annotations: {}
# Create ServiceAccount
serviceAccount:
# Specifies whether a ServiceAccount should be created
create: true
# The name of the ServiceAccount to use.
# If not set and create is true, a name is generated using the release
name
name: ~
# Annotations to add to worker kubernetes service account.
annotations: {}
# Additional network policies as needed
extraNetworkPolicies: []
# Pool sizes
metadataPoolSize: 10
resultBackendPoolSize: 5
# Maximum clients that can connect to PgBouncer (higher = more file
descriptors)
maxClientConn: 100
configSecretName: ~
# PgBouncer pod disruption budget
podDisruptionBudget:
enabled: false
# PDB configuration
config:
maxUnavailable: 1
resources: {}
service:
extraAnnotations: {}
# https://www.pgbouncer.org/config.html
verbose: 0
logDisconnections: 0
logConnections: 0
sslmode: "prefer"
ciphers: "normal"
ssl:
ca: ~
cert: ~
key: ~
# Add extra PgBouncer ini configuration in the databases section:
# https://www.pgbouncer.org/config.html#section-databases
extraIniMetadata: ~
extraIniResultBackend: ~
# Add extra general PgBouncer ini configuration:
https://www.pgbouncer.org/config.html
extraIni: ~
# Mount additional volumes into pgbouncer.
extraVolumes: []
extraVolumeMounts: []
# Select certain nodes for PgBouncer pods.
nodeSelector: {}
affinity: {}
tolerations: []
topologySpreadConstraints: []
priorityClassName: ~
uid: 65534
metricsExporterSidecar:
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
sslmode: "disable"
# Configuration for the redis provisioned by the chart
redis:
enabled: true
terminationGracePeriodSeconds: 600
# Create ServiceAccount
serviceAccount:
# Specifies whether a ServiceAccount should be created
create: true
# The name of the ServiceAccount to use.
# If not set and create is true, a name is generated using the release
name
name: ~
# Annotations to add to worker kubernetes service account.
annotations: {}
persistence:
# Enable persistent volumes
enabled: true
# Volume size for worker StatefulSet
size: 1Gi
# If using a custom storageClass, pass name ref to all statefulSets here
storageClassName:
# Annotations to add to redis volumes
annotations: {}
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
# If set use as redis secret. Make sure to also set
data.brokerUrlSecretName value.
passwordSecretName: ~
# Else, if password is set, create secret with it,
# Otherwise a new password will be generated on install
# Note: password can only be set during install, not upgrade.
password: ~
# This setting tells kubernetes that its ok to evict
# when it wants to scale a node down.
safeToEvict: true
# Select certain nodes for redis pods.
nodeSelector: {}
affinity: {}
tolerations: []
topologySpreadConstraints: []
# Set to 0 for backwards-compatiblity
uid: 0
# If not set, `redis.uid` will be used
securityContext: {}
# runAsUser: 999
# runAsGroup: 0
podAnnotations: {}
# Auth secret for a private registry
# This is used if pulling airflow images from a private registry
registry:
secretName: ~
# Example:
# connection:
# user: ~
# pass: ~
# host: ~
# email: ~
connection: {}
# Elasticsearch logging configuration
elasticsearch:
# Enable elasticsearch task logging
enabled: false
# A secret containing the connection
secretName: ~
# Or an object representing the connection
# Example:
# connection:
# user: ~
# pass: ~
# host: ~
# port: ~
connection: {}
# All ports used by chart
ports:
flowerUI: 5555
airflowUI: 8080
workerLogs: 8793
redisDB: 6379
statsdIngest: 9125
statsdScrape: 9102
pgbouncer: 6543
pgbouncerScrape: 9127
# Define any ResourceQuotas for namespace
quotas: {}
# Define default/max/min values for pods and containers in namespace
limits: []
# This runs as a CronJob to cleanup old pods.
cleanup:
enabled: false
# Run every 15 minutes
schedule: "*/15 * * * *"
# Command to use when running the cleanup cronjob (templated).
command: ~
# Args to use when running the cleanup cronjob (templated).
args: ["bash", "-c", "exec airflow kubernetes cleanup-pods --namespace={{
.Release.Namespace }}"]
# Select certain nodes for airflow cleanup pods.
nodeSelector: {}
affinity: {}
tolerations: []
topologySpreadConstraints: []
podAnnotations: {}
# Labels specific to cleanup objects and pods
labels: {}
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
# Create ServiceAccount
serviceAccount:
# Specifies whether a ServiceAccount should be created
create: true
# The name of the ServiceAccount to use.
# If not set and create is true, a name is generated using the release
name
name: ~
# Annotations to add to cleanup cronjob kubernetes service account.
annotations: {}
# When not set, the values defined in the global securityContext will be
used
securityContext: {}
# runAsUser: 50000
# runAsGroup: 0
env: []
# Specify history limit
# When set, overwrite the default k8s number of successful and failed
CronJob executions that are saved.
failedJobsHistoryLimit: ~
successfulJobsHistoryLimit: ~
# Configuration for postgresql subchart
# Not recommended for production
postgresql:
enabled: false
config:
core:
dags_folder: '{{ include "airflow_dags" . }}'
# This is ignored when used with the official Docker image
load_examples: 'False'
executor: '{{ .Values.executor }}'
# For Airflow 1.10, backward compatibility; moved to [logging] in 2.0
colored_console_log: 'False'
remote_logging: '{{- ternary "True" "False"
.Values.elasticsearch.enabled }}'
logging:
remote_logging: 'True'
# colored_console_log: 'False'
logging_config_class: log_config.LOGGING_CONFIG
remote_log_conn_id: AIRFLOW_CONN_ADLS
metrics:
statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
statsd_port: 9125
statsd_prefix: airflow
statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
webserver:
enable_proxy_fix: 'True'
# For Airflow 1.10
rbac: 'True'
celery:
flower_url_prefix: '{{ .Values.ingress.flower.path }}'
worker_concurrency: 16
scheduler:
standalone_dag_processor: '{{ ternary "True" "False"
.Values.dagProcessor.enabled }}'
# statsd params included for Airflow 1.10 backward compatibility; moved
to [metrics] in 2.0
statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
statsd_port: 9125
statsd_prefix: airflow
statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
# `run_duration` included for Airflow 1.10 backward compatibility;
removed in 2.0.
run_duration: 41460
elasticsearch:
json_format: 'True'
log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}"
elasticsearch_configs:
max_retries: 3
timeout: 30
retry_timeout: 'True'
kerberos:
keytab: '{{ .Values.kerberos.keytabPath }}'
reinit_frequency: '{{ .Values.kerberos.reinitFrequency }}'
principal: '{{ .Values.kerberos.principal }}'
ccache: '{{ .Values.kerberos.ccacheMountPath }}/{{
.Values.kerberos.ccacheFileName }}'
celery_kubernetes_executor:
kubernetes_queue: 'kubernetes'
kubernetes:
namespace: '{{ .Release.Namespace }}'
airflow_configmap: '{{ include "airflow_config" . }}'
airflow_local_settings_configmap: '{{ include "airflow_config" . }}'
pod_template_file: '{{ include "airflow_pod_template_file" .
}}/pod_template_file.yaml'
worker_container_repository: '{{ .Values.images.airflow.repository |
default .Values.defaultAirflowRepository }}'
worker_container_tag: '{{ .Values.images.airflow.tag | default
.Values.defaultAirflowTag }}'
multi_namespace_mode: '{{ ternary "True" "False"
.Values.multiNamespaceMode }}'
# Whether Airflow can launch workers and/or pods in multiple namespaces
# If true, it creates ClusterRole/ClusterRolebinding (with access to entire
cluster)
multiNamespaceMode: false
podTemplate: ~
# Git sync
dags:
persistence:
# Enable persistent volume for storing dags
enabled: false
# Volume size for dags
size: 1Gi
# If using a custom storageClass, pass name here
storageClassName: azureblob-fuse-premium
# access mode of the persistent volume
accessMode: ReadWriteOnce
## the name of an existing PVC to use
existingClaim: airflow-dags
## optional subpath for dag volume mount
subPath: ~
gitSync:
enabled: True
# git repo clone url
# ssh example: [email protected]:apache/airflow.git
# https example: https://github.com/apache/airflow.git
repo: https://github.com/ahmadfarhan97/example.git
branch: main
rev: HEAD
depth: 1
# the number of consecutive failures allowed before aborting
maxFailures: 0
# subpath within the repo where dags are located
# should be "" if dags are at repo root
subPath: "dags"
credentialsSecret: git-credentials
wait: 5
containerName: git-sync
uid: 65533
# When not set, the values defined in the global securityContext will be
used
securityContext: {}
# runAsUser: 65533
# runAsGroup: 0
extraVolumeMounts: []
env: []
resources: {}
logs:
persistence:
# Enable persistent volume for storing logs
enabled: false
# Volume size for logs
size: 100Gi
# If using a custom storageClass, pass name here
storageClassName:
## the name of an existing PVC to use
existingClaim: airflow-logs
```
### Docker Image customizations
```
FROM apache/airflow:2.5.3-python3.8
ENV PYTHONPATH "${PYTHONPATH}:${AIRFLOW_HOME}"
COPY requirements.txt .
COPY __init__.py ${AIRFLOW_HOME}/config/__init__.py
COPY log_config.py ${AIRFLOW_HOME}/config/log_config.py
RUN pip install -r requirements.txt
```
### What happened
I was trying to connect to Azure blob Storage for logs from local machine.
I used the [Airflow
documentation](https://airflow.apache.org/docs/apache-airflow-providers-microsoft-azure/stable/logging/index.html)
to setup the configuration in values.yaml as well as creating and copying a
log_config.py file into the docker image (as shown in the docker image
customizations section). I keep getting the following error in the
airflow-run-airflow-migrations-xxxxx pod:
```
....................
ERROR! Maximum number of retries (20) reached.
Last check result:
$ airflow db check
Unable to load the config, contains a configuration error.
Traceback (most recent call last):
File "/usr/local/lib/python3.8/logging/config.py", line 563, in configure
handler = self.configure_handler(handlers[name])
File "/usr/local/lib/python3.8/logging/config.py", line 744, in
configure_handler
result = factory(**kwargs)
TypeError: __init__() missing 1 required positional argument:
'delete_local_copy'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 5, in <module>
from airflow.__main__ import main
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/__init__.py", line
64, in <module>
settings.initialize()
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/settings.py", line
570, in initialize
LOGGING_CLASS_PATH = configure_logging()
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/logging_config.py",
line 74, in configure_logging
raise e
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/logging_config.py",
line 69, in configure_logging
dictConfig(logging_config)
File "/usr/local/lib/python3.8/logging/config.py", line 808, in dictConfig
dictConfigClass(config).configure()
File "/usr/local/lib/python3.8/logging/config.py", line 570, in configure
raise ValueError('Unable to configure handler '
ValueError: Unable to configure handler 'task'
```
### What you think should happen instead
The pods keep getting error
### How to reproduce
The following script is the log_config.py file which is copied using into ``
the dockerfile
```
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Airflow logging settings."""
from __future__ import annotations
import os
from pathlib import Path
from typing import Any
from urllib.parse import urlsplit
from airflow.configuration import conf
from airflow.exceptions import AirflowException
LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()
# Flask appbuilder's info level log is very verbose,
# so it's set to 'WARN' by default.
FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging",
"FAB_LOGGING_LEVEL").upper()
LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT")
DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging",
"DAG_PROCESSOR_LOG_FORMAT")
LOG_FORMATTER_CLASS: str = conf.get_mandatory_value(
"logging", "LOG_FORMATTER_CLASS",
fallback="airflow.utils.log.timezone_aware.TimezoneAware"
)
COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging",
"COLORED_LOG_FORMAT")
COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG")
COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging",
"COLORED_FORMATTER_CLASS")
DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging",
"DAG_PROCESSOR_LOG_TARGET")
BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")
PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler",
"CHILD_PROCESS_LOG_DIRECTORY")
DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value(
"logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION"
)
# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3
# All of these handlers inherited from FileTaskHandler and providing any
value rather than None
# would raise deprecation warning.
FILENAME_TEMPLATE: str | None = None
PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging",
"LOG_PROCESSOR_FILENAME_TEMPLATE")
LOGGING_CONFIG: dict[str, Any] = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"airflow": {
"format": LOG_FORMAT,
"class": LOG_FORMATTER_CLASS,
},
"airflow_coloured": {
"format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
"class": COLORED_FORMATTER_CLASS if COLORED_LOG else
LOG_FORMATTER_CLASS,
},
"source_processor": {
"format": DAG_PROCESSOR_LOG_FORMAT,
"class": LOG_FORMATTER_CLASS,
},
},
"filters": {
"mask_secrets": {
"()": "airflow.utils.log.secrets_masker.SecretsMasker",
},
},
"handlers": {
"console": {
"class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
"formatter": "airflow_coloured",
"stream": "sys.stdout",
"filters": ["mask_secrets"],
},
"task": {
"class": "airflow.utils.log.file_task_handler.FileTaskHandler",
"formatter": "airflow",
"base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
"filters": ["mask_secrets"],
},
"processor": {
"class":
"airflow.utils.log.file_processor_handler.FileProcessorHandler",
"formatter": "airflow",
"base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
"filename_template": PROCESSOR_FILENAME_TEMPLATE,
"filters": ["mask_secrets"],
},
"processor_to_stdout": {
"class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
"formatter": "source_processor",
"stream": "sys.stdout",
"filters": ["mask_secrets"],
},
},
"loggers": {
"airflow.processor": {
"handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET
== "stdout" else "processor"],
"level": LOG_LEVEL,
# Set to true here (and reset via set_context) so that if no
file is configured we still get logs!
"propagate": True,
},
"airflow.task": {
"handlers": ["task"],
"level": LOG_LEVEL,
# Set to true here (and reset via set_context) so that if no
file is configured we still get logs!
"propagate": True,
"filters": ["mask_secrets"],
},
"flask_appbuilder": {
"handlers": ["console"],
"level": FAB_LOG_LEVEL,
"propagate": True,
},
},
"root": {
"handlers": ["console"],
"level": LOG_LEVEL,
"filters": ["mask_secrets"],
},
}
EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES",
fallback=None)
if EXTRA_LOGGER_NAMES:
new_loggers = {
logger_name.strip(): {
"handlers": ["console"],
"level": LOG_LEVEL,
"propagate": True,
}
for logger_name in EXTRA_LOGGER_NAMES.split(",")
}
LOGGING_CONFIG["loggers"].update(new_loggers)
DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = {
"handlers": {
"processor_manager": {
"class":
"airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler",
"formatter": "airflow",
"filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
"mode": "a",
"maxBytes": 104857600, # 100MB
"backupCount": 5,
}
},
"loggers": {
"airflow.processor_manager": {
"handlers": ["processor_manager"],
"level": LOG_LEVEL,
"propagate": False,
}
},
}
# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER
is set.
# This is to avoid exceptions when initializing RotatingFileHandler multiple
times
# in multiple processes.
if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True":
LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"])
LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"])
# Manually create log directory for processor_manager handler as
RotatingFileHandler
# will only create file but not the directory.
processor_manager_handler_config: dict[str, Any] =
DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][
"processor_manager"
]
directory: str =
os.path.dirname(processor_manager_handler_config["filename"])
Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755)
##################
# Remote logging #
##################
REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")
if REMOTE_LOGGING:
ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST")
# Storage bucket URL for remote logging
# S3 buckets should start with "s3://"
# Cloudwatch log groups should start with "cloudwatch://"
# GCS buckets should start with "gs://"
# WASB buckets should start with "wasb"
# just to help Airflow select correct handler
REMOTE_BASE_LOG_FOLDER: str =
'wasb://[email protected]'
REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging",
"REMOTE_TASK_HANDLER_KWARGS", fallback={})
if REMOTE_BASE_LOG_FOLDER.startswith("s3://"):
S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
"task": {
"class":
"airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"s3_log_folder": REMOTE_BASE_LOG_FOLDER,
"filename_template": FILENAME_TEMPLATE,
},
}
LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"):
url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER)
CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
"task": {
"class":
"airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"log_group_arn": url_parts.netloc + url_parts.path,
"filename_template": FILENAME_TEMPLATE,
},
}
LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"):
key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH",
fallback=None)
GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
"task": {
"class":
"airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"gcs_log_folder": REMOTE_BASE_LOG_FOLDER,
"filename_template": FILENAME_TEMPLATE,
"gcp_key_path": key_path,
},
}
LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
"task": {
"class":
"airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
"wasb_container": "airflow-logs",
"filename_template": FILENAME_TEMPLATE,
},
}
LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"):
key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH",
fallback=None)
# stackdriver:///airflow-tasks => airflow-tasks
log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:]
STACKDRIVER_REMOTE_HANDLERS = {
"task": {
"class":
"airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
"formatter": "airflow",
"name": log_name,
"gcp_key_path": key_path,
}
}
LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"):
OSS_REMOTE_HANDLERS = {
"task": {
"class":
"airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler",
"formatter": "airflow",
"base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
"oss_log_folder": REMOTE_BASE_LOG_FOLDER,
"filename_template": FILENAME_TEMPLATE,
},
}
LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS)
elif ELASTICSEARCH_HOST:
ELASTICSEARCH_END_OF_LOG_MARK: str =
conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
ELASTICSEARCH_FRONTEND: str =
conf.get_mandatory_value("elasticsearch", "frontend")
ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch",
"WRITE_STDOUT")
ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch",
"JSON_FORMAT")
ELASTICSEARCH_JSON_FIELDS: str =
conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
ELASTICSEARCH_HOST_FIELD: str =
conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
ELASTICSEARCH_OFFSET_FIELD: str =
conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
"task": {
"class":
"airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"filename_template": FILENAME_TEMPLATE,
"end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
"host": ELASTICSEARCH_HOST,
"frontend": ELASTICSEARCH_FRONTEND,
"write_stdout": ELASTICSEARCH_WRITE_STDOUT,
"json_format": ELASTICSEARCH_JSON_FORMAT,
"json_fields": ELASTICSEARCH_JSON_FIELDS,
"host_field": ELASTICSEARCH_HOST_FIELD,
"offset_field": ELASTICSEARCH_OFFSET_FIELD,
},
}
LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
else:
raise AirflowException(
"Incorrect remote log configuration. Please check the
configuration of option 'host' in "
"section 'elasticsearch' if you are using Elasticsearch. In the
other case, "
"'remote_base_log_folder' option in the 'logging' section."
)
LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS)
```
and the __init__.py empty file is copied to the image in the same directory
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]