SulimanLab opened a new issue #13680:
URL: https://github.com/apache/airflow/issues/13680


   **Apache Airflow version**: 2.0.0
   
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`): v1.19.4
   
   **What happened**:
   I get this error when try to execute tasks using kubernetes
   
   ```
   [2021-01-14 19:39:17,628] {dagbag.py:440} INFO - Filling up the DagBag from 
/opt/airflow/dags/repo/bash.py
   Traceback (most recent call last):
     File "/home/airflow/.local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/__main__.py", line 
40, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", 
line 48, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 
89, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py",
 line 216, in task_run
       dag = get_dag(args.subdir, args.dag_id)
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 
189, in get_dag
       'parse.'.format(dag_id)
   airflow.exceptions.AirflowException: dag_id could not be found: bash. Either 
the dag did not exist or it failed to parse.
   
   ```
   
   **What you expected to happen**: 
   get executed and terminate
   
   **How to reproduce it**:
   deploy airflow helm chart using this values.yaml:
   
   ```
   # Licensed to the Apache Software Foundation (ASF) under one
   # or more contributor license agreements.  See the NOTICE file
   # distributed with this work for additional information
   # regarding copyright ownership.  The ASF licenses this file
   # to you under the Apache License, Version 2.0 (the
   # "License"); you may not use this file except in compliance
   # with the License.  You may obtain a copy of the License at
   #
   #   http://www.apache.org/licenses/LICENSE-2.0
   #
   # Unless required by applicable law or agreed to in writing,
   # software distributed under the License is distributed on an
   # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   # KIND, either express or implied.  See the License for the
   # specific language governing permissions and limitations
   # under the License.
   ---
   # Default values for airflow.
   # This is a YAML-formatted file.
   # Declare variables to be passed into your templates.
   
   # User and group of airflow user
   uid: 50000
   gid: 50000
   
   # Airflow home directory
   # Used for mount paths
   airflowHome: "/opt/airflow"
   
   # Default airflow repository -- overrides all the specific images below
   defaultAirflowRepository: apache/airflow
   
   # Default airflow tag to deploy
   defaultAirflowTag: 2.0.0
   
   
   # Select certain nodes for airflow pods.
   nodeSelector: { }
   affinity: { }
   tolerations: [ ]
   
   # Add common labels to all objects and pods defined in this chart.
   labels: { }
   
   # Ingress configuration
   ingress:
     # Enable ingress resource
     enabled: false
   
     # Configs for the Ingress of the web Service
     web:
       # Annotations for the web Ingress
       annotations: { }
   
       # The path for the web Ingress
       path: ""
   
       # The hostname for the web Ingress
       host: ""
   
       # configs for web Ingress TLS
       tls:
         # Enable TLS termination for the web Ingress
         enabled: false
         # the name of a pre-created Secret containing a TLS private key and 
certificate
         secretName: ""
   
       # HTTP paths to add to the web Ingress before the default path
       precedingPaths: [ ]
   
       # Http paths to add to the web Ingress after the default path
       succeedingPaths: [ ]
   
     # Configs for the Ingress of the flower Service
     flower:
       # Annotations for the flower Ingress
       annotations: { }
   
       # The path for the flower Ingress
       path: ""
   
       # The hostname for the flower Ingress
       host: ""
   
       # configs for web Ingress TLS
       tls:
         # Enable TLS termination for the flower Ingress
         enabled: false
         # the name of a pre-created Secret containing a TLS private key and 
certificate
         secretName: ""
   
       # HTTP paths to add to the flower Ingress before the default path
       precedingPaths: [ ]
   
       # Http paths to add to the flower Ingress after the default path
       succeedingPaths: [ ]
   
   # Network policy configuration
   networkPolicies:
     # Enabled network policies
     enabled: false
   
   
   # Extra annotations to apply to all
   # Airflow pods
   airflowPodAnnotations: { }
   
   # Enable RBAC (default on most clusters these days)
   rbacEnabled: true
   
   # Airflow executor
   # Options: SequentialExecutor, LocalExecutor, CeleryExecutor, 
KubernetesExecutor
   executor: "KubernetesExecutor"
   
   # If this is true and using 
LocalExecutor/SequentialExecutor/KubernetesExecutor, the scheduler's
   # service account will have access to communicate with the api-server and 
launch pods.
   # If this is true and using the CeleryExecutor, the workers will be able to 
launch pods.
   allowPodLaunching: true
   
   # Images
   images:
     airflow:
       repository: ~
       tag: ~
       pullPolicy: IfNotPresent
     pod_template:
       repository: ~
       tag: ~
       pullPolicy: IfNotPresent
     flower:
       repository: ~
       tag: ~
       pullPolicy: IfNotPresent
     statsd:
       repository: apache/airflow
       tag: airflow-statsd-exporter-2020.09.05-v0.17.0
       pullPolicy: IfNotPresent
     redis:
       repository: redis
       tag: 6-buster
       pullPolicy: IfNotPresent
     pgbouncer:
       repository: apache/airflow
       tag: airflow-pgbouncer-2020.09.05-1.14.0
       pullPolicy: IfNotPresent
     pgbouncerExporter:
       repository: apache/airflow
       tag: airflow-pgbouncer-exporter-2020.09.25-0.5.0
       pullPolicy: IfNotPresent
     gitSync:
       repository: k8s.gcr.io/git-sync
       tag: v3.1.6
       pullPolicy: IfNotPresent
   
   # Environment variables for all airflow containers
   env:
     - name: "AIRFLOW__KUBERNETES__GIT_SYNC_RUN_AS_USER"
       value: "65533"
   
   # Secrets for all airflow containers
   secret: [ ]
   # - envName: ""
   #   secretName: ""
   #   secretKey: ""
   
   # Extra secrets that will be managed by the chart
   # (You can use them with extraEnv or extraEnvFrom or some of the 
extraVolumes values).
   # The format is "key/value" where
   #    * key (can be templated) is the the name the secret that will be created
   #    * value: an object with the standard 'data' or 'stringData' key (or 
both).
   #          The value associated with those keys must be a string (can be 
templated)
   extraSecrets: { }
   # eg:
   # extraSecrets:
   #   {{ .Release.Name }}-airflow-connections:
   #     data: |
   #       AIRFLOW_CONN_GCP: 'base64_encoded_gcp_conn_string'
   #       AIRFLOW_CONN_AWS: 'base64_encoded_aws_conn_string'
   #     stringData: |
   #       AIRFLOW_CONN_OTHER: 'other_conn'
   #   {{ .Release.Name }}-other-secret-name-suffix: |
   #     data: |
   #        ...
   
   # Extra ConfigMaps that will be managed by the chart
   # (You can use them with extraEnv or extraEnvFrom or some of the 
extraVolumes values).
   # The format is "key/value" where
   #    * key (can be templated) is the the name the configmap that will be 
created
   #    * value: an object with the standard 'data' key.
   #          The value associated with this keys must be a string (can be 
templated)
   extraConfigMaps: { }
   # eg:
   # extraConfigMaps:
   #   {{ .Release.Name }}-airflow-variables:
   #     data: |
   #       AIRFLOW_VAR_HELLO_MESSAGE: "Hi!"
   #       AIRFLOW_VAR_KUBERNETES_NAMESPACE: "{{ .Release.Namespace }}"
   
   # Extra env 'items' that will be added to the definition of airflow 
containers
   # a string is expected (can be templated).
   extraEnv: ~
   # eg:
   # extraEnv: |
   #   - name: PLATFORM
   #     value: FR
   
   # Extra envFrom 'items' that will be added to the definition of airflow 
containers
   # A string is expected (can be templated).
   extraEnvFrom: ~
   # eg:
   # extraEnvFrom: |
   #   - secretRef:
   #       name: '{{ .Release.Name }}-airflow-connections'
   #   - configMapRef:
   #       name: '{{ .Release.Name }}-airflow-variables'
   
   # Airflow database config
   data:
     # If secret names are provided, use those secrets
     metadataSecretName: ~
     resultBackendSecretName: ~
   
     # Otherwise pass connection values in
     metadataConnection:
       user: postgres
       pass: postgres
       host: ~
       port: 5432
       db: postgres
       sslmode: disable
     resultBackendConnection:
       user: postgres
       pass: postgres
       host: ~
       port: 5432
       db: postgres
       sslmode: disable
   
   # Fernet key settings
   fernetKey: ~
   fernetKeySecretName: ~
   
   
   # In order to use kerberos you need to create secret containing the keytab 
file
   # The secret name should follow naming convention of the application where 
resources are
   # name {{ .Release-name }}-<POSTFIX>. In case of the keytab file, the 
postfix is "kerberos-keytab"
   # So if your release is named "my-release" the name of the secret should be 
"my-release-kerberos-keytab"
   #
   # The Keytab content should be available in the "kerberos.keytab" key of the 
secret.
   #
   #  apiVersion: v1
   #  kind: Secret
   #  data:
   #    kerberos.keytab: <base64_encoded keytab file content>
   #  type: Opaque
   #
   #
   #  If you have such keytab file you can do it with similar
   #
   #  kubectl create secret generic {{ .Release.name }}-kerberos-keytab 
--from-file=kerberos.keytab
   #
   kerberos:
     enabled: false
     ccacheMountPath: '/var/kerberos-ccache'
     ccacheFileName: 'cache'
     configPath: '/etc/krb5.conf'
     keytabPath: '/etc/airflow.keytab'
     principal: '[email protected]'
     reinitFrequency: 3600
     config: |
       # This is an example config showing how you can use templating and how 
"example" config
       # might look like. It works with the test kerberos server that we are 
using during integration
       # testing at Apache Airflow (see 
`scripts/ci/docker-compose/integration-kerberos.yml` but in
       # order to make it production-ready you must replace it with your own 
configuration that
       # Matches your kerberos deployment. Administrators of your Kerberos 
instance should
       # provide the right configuration.
   
       [logging]
       default = "FILE:{{ template "airflow_logs_no_quote" . 
}}/kerberos_libs.log"
       kdc = "FILE:{{ template "airflow_logs_no_quote" . }}/kerberos_kdc.log"
       admin_server = "FILE:{{ template "airflow_logs_no_quote" . 
}}/kadmind.log"
   
       [libdefaults]
       default_realm = FOO.COM
       ticket_lifetime = 10h
       renew_lifetime = 7d
       forwardable = true
   
       [realms]
       FOO.COM = {
         kdc = kdc-server.foo.com
         admin_server = admin_server.foo.com
       }
   
   # Airflow Worker Config
   workers:
     # Number of airflow celery workers in StatefulSet
     replicas: 1
   
     # Allow KEDA autoscaling.
     # Persistence.enabled must be set to false to use KEDA.
     keda:
       enabled: false
       namespaceLabels: { }
   
       # How often KEDA polls the airflow DB to report new scale requests to 
the HPA
       pollingInterval: 5
   
       # How many seconds KEDA will wait before scaling to zero.
       # Note that HPA has a separate cooldown period for scale-downs
       cooldownPeriod: 30
   
       # Maximum number of workers created by keda
       maxReplicaCount: 10
   
     persistence:
       # Enable persistent volumes
       enabled: true
       # Volume size for worker StatefulSet
       size: 100Gi
       # If using a custom storageClass, pass name ref to all statefulSets here
       storageClassName:
       # Execute init container to chown log directory.
       # This is currently only needed in KinD, due to usage
       # of local-path provisioner.
       fixPermissions: false
   
     kerberosSidecar:
       # Enable kerberos sidecar
       enabled: false
   
     resources: { }
     #  limits:
     #   cpu: 100m
     #   memory: 128Mi
     #  requests:
     #   cpu: 100m
     #   memory: 128Mi
   
     # Grace period for tasks to finish after SIGTERM is sent from kubernetes
     terminationGracePeriodSeconds: 600
   
     # This setting tells kubernetes that its ok to evict
     # when it wants to scale a node down.
     safeToEvict: true
     # Annotations to add to worker kubernetes service account.
     serviceAccountAnnotations: { }
     # Mount additional volumes into worker.
     extraVolumes: [ ]
     extraVolumeMounts: [ ]
   
   # Airflow scheduler settings
   scheduler:
     # Airflow 2.0 allows users to run multiple schedulers,
     # However this feature is only recommended for MySQL 8+ and Postgres
     replicas: 1
     # Scheduler pod disruption budget
     podDisruptionBudget:
       enabled: false
   
       # PDB configuration
       config:
         maxUnavailable: 1
   
     resources: { }
     #  limits:
     #   cpu: 100m
     #   memory: 128Mi
     #  requests:
     #   cpu: 100m
     #   memory: 128Mi
   
     # This setting can overwrite
     # podMutation settings.
     airflowLocalSettings: ~
   
     # This setting tells kubernetes that its ok to evict
     # when it wants to scale a node down.
     safeToEvict: true
   
     # Annotations to add to scheduler kubernetes service account.
     serviceAccountAnnotations: { }
   
     # Mount additional volumes into scheduler.
     extraVolumes: [ ]
     extraVolumeMounts: [ ]
   
   # Airflow webserver settings
   webserver:
     allowPodLogReading: true
     livenessProbe:
       initialDelaySeconds: 15
       timeoutSeconds: 30
       failureThreshold: 20
       periodSeconds: 5
   
     readinessProbe:
       initialDelaySeconds: 15
       timeoutSeconds: 30
       failureThreshold: 20
       periodSeconds: 5
   
     # Number of webservers
     replicas: 1
   
     # Additional network policies as needed
     extraNetworkPolicies: [ ]
   
     resources: { }
     #   limits:
     #     cpu: 100m
     #     memory: 128Mi
     #   requests:
     #     cpu: 100m
     #     memory: 128Mi
   
     # Create initial user.
     defaultUser:
       enabled: true
       role: Admin
       username: admin
       email: [email protected]
       firstName: admin
       lastName: user
       password: admin
   
     # Mount additional volumes into webserver.
     extraVolumes: [ ]
     #    - name: airflow-ui
     #      emptyDir: { }
     extraVolumeMounts: [ ]
     #    - name: airflow-ui
     #      mountPath: /opt/airflow
   
     # This will be mounted into the Airflow Webserver as a custom
     # webserver_config.py. You can bake a webserver_config.py in to your image
     # instead
     webserverConfig: ~
     # webserverConfig: |
     #   from airflow import configuration as conf
   
     #   # The SQLAlchemy connection string.
     #   SQLALCHEMY_DATABASE_URI = conf.get('core', 'SQL_ALCHEMY_CONN')
   
     #   # Flask-WTF flag for CSRF
     #   CSRF_ENABLED = True
   
     service:
       type: NodePort
       ## service annotations
       annotations: { }
   
     # Annotations to add to webserver kubernetes service account.
     serviceAccountAnnotations: { }
   
   # Flower settings
   flower:
     # Additional network policies as needed
     extraNetworkPolicies: [ ]
     resources: { }
     #   limits:
     #     cpu: 100m
     #     memory: 128Mi
     #   requests:
     #     cpu: 100m
     #     memory: 128Mi
   
     # A secret containing the connection
     secretName: ~
   
     # Else, if username and password are set, create secret from username and 
password
     username: ~
     password: ~
   
     service:
       type: ClusterIP
   
   # Statsd settings
   statsd:
     enabled: true
     # Additional network policies as needed
     extraNetworkPolicies: [ ]
     resources: { }
     #   limits:
     #     cpu: 100m
     #     memory: 128Mi
     #   requests:
     #     cpu: 100m
     #     memory: 128Mi
   
     service:
       extraAnnotations: { }
   
   # Pgbouncer settings
   pgbouncer:
     # Enable pgbouncer
     enabled: false
     # Additional network policies as needed
     extraNetworkPolicies: [ ]
   
     # Pool sizes
     metadataPoolSize: 10
     resultBackendPoolSize: 5
   
     # Maximum clients that can connect to pgbouncer (higher = more file 
descriptors)
     maxClientConn: 100
   
     # Pgbouner pod disruption budget
     podDisruptionBudget:
       enabled: false
   
       # PDB configuration
       config:
         maxUnavailable: 1
   
     # Limit the resources to pgbouncerExported.
     # When you specify the resource request the scheduler uses this 
information to decide which node to place
     # the Pod on. When you specify a resource limit for a Container, the 
kubelet enforces those limits so
     # that the running container is not allowed to use more of that resource 
than the limit you set.
     # See: 
https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
     # Example:
     #
     # resource:
     #   limits:
     #     cpu: 100m
     #     memory: 128Mi
     #   requests:
     #     cpu: 100m
     #     memory: 128Mi
     resources: { }
   
     service:
       extraAnnotations: { }
   
     # https://www.pgbouncer.org/config.html
     verbose: 0
     logDisconnections: 0
     logConnections: 0
   
     sslmode: "prefer"
     ciphers: "normal"
   
     ssl:
       ca: ~
       cert: ~
       key: ~
   
   redis:
     terminationGracePeriodSeconds: 600
   
     persistence:
       # Enable persistent volumes
       enabled: true
       # Volume size for worker StatefulSet
       size: 1Gi
       # If using a custom storageClass, pass name ref to all statefulSets here
       storageClassName:
   
     resources: { }
     #  limits:
     #   cpu: 100m
     #   memory: 128Mi
     #  requests:
     #   cpu: 100m
     #   memory: 128Mi
   
     # If set use as redis secret
     passwordSecretName: ~
     brokerURLSecretName: ~
   
     # Else, if password is set, create secret with it,
     # else generate a new one on install
     password: ~
   
     # This setting tells kubernetes that its ok to evict
     # when it wants to scale a node down.
     safeToEvict: true
   
   # Auth secret for a private registry
   # This is used if pulling airflow images from a private registry
   registry:
     secretName: ~
   
     # Example:
     # connection:
     #   user: ~
     #   pass: ~
     #   host: ~
     #   email: ~
     connection: { }
   
   # Elasticsearch logging configuration
   elasticsearch:
     # Enable elasticsearch task logging
     enabled: true
     # A secret containing the connection
     #  secretName: ~
     # Or an object representing the connection
     # Example:
     connection:
       #     user:
       #     pass:
       host: elasticsearch-master-headless.elk.svc.cluster.local
       port: 9200
   #  connection: {}
   
   
   # All ports used by chart
   ports:
     flowerUI: 5555
     airflowUI: 8080
     workerLogs: 8793
     redisDB: 6379
     statsdIngest: 9125
     statsdScrape: 9102
     pgbouncer: 6543
     pgbouncerScrape: 9127
   
   # Define any ResourceQuotas for namespace
   quotas: { }
   
   # Define default/max/min values for pods and containers in namespace
   limits: [ ]
   
   # This runs as a CronJob to cleanup old pods.
   cleanup:
     enabled: false
     # Run every 15 minutes
     schedule: "*/15 * * * *"
   
   # Configuration for postgresql subchart
   # Not recommended for production
   postgresql:
     enabled: true
     postgresqlPassword: postgres
     postgresqlUsername: postgres
   
   # Config settings to go into the mounted airflow.cfg
   #
   # Please note that these values are passed through the `tpl` function, so are
   # all subject to being rendered as go templates. If you need to include a
   # literal `{{` in a value, it must be expressed like this:
   #
   #    a: '{{ "{{ not a template }}" }}'
   #
   # yamllint disable rule:line-length
   config:
     core:
       dags_folder: '{{ include "airflow_dags" . }}'
       load_examples: 'False'
       executor: '{{ .Values.executor }}'
       # For Airflow 1.10, backward compatibility
       colored_console_log: 'True'
       remote_logging: '{{- ternary "True" "False" 
.Values.elasticsearch.enabled }}'
     # Authentication backend used for the experimental API
     api:
       auth_backend: airflow.api.auth.backend.deny_all
     logging:
       remote_logging: '{{- ternary "True" "False" 
.Values.elasticsearch.enabled }}'
       colored_console_log: 'True'
       logging_level: INFO
     metrics:
       statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
       statsd_port: 9125
       statsd_prefix: airflow
       statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
     webserver:
       enable_proxy_fix: 'True'
       expose_config: 'True'
       rbac: 'True'
     celery:
       default_queue: celery
     scheduler:
       scheduler_heartbeat_sec: 5
       # For Airflow 1.10, backward compatibility
       statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
       statsd_port: 9125
       statsd_prefix: airflow
       statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
       # Restart Scheduler every 41460 seconds (11 hours 31 minutes)
       # The odd time is chosen so it is not always restarting on the same 
"hour" boundary
       run_duration: 41460
     elasticsearch:
       json_format: 'True'
       log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}"
     elasticsearch_configs:
       max_retries: 3
       timeout: 30
       retry_timeout: 'True'
     kerberos:
       keytab: '{{ .Values.kerberos.keytabPath }}'
       reinit_frequency: '{{ .Values.kerberos.reinitFrequency }}'
       principal: '{{ .Values.kerberos.principal }}'
       ccache: '{{ .Values.kerberos.ccacheMountPath }}/{{ 
.Values.kerberos.ccacheFileName }}'
     kubernetes:
       namespace: '{{ .Release.Namespace }}'
       airflow_configmap: '{{ include "airflow_config" . }}'
       airflow_local_settings_configmap: '{{ include "airflow_config" . }}'
       pod_template_file: '{{ include "airflow_pod_template_file" . 
}}/pod_template_file.yaml'
       worker_container_repository: '{{ .Values.images.airflow.repository | 
default .Values.defaultAirflowRepository }}'
       worker_container_tag: '{{ .Values.images.airflow.tag | default 
.Values.defaultAirflowTag }}'
       delete_worker_pods: 'False'
       multi_namespace_mode: '{{ if .Values.multiNamespaceMode }}True{{ else 
}}False{{ end }}'
   # yamllint enable rule:line-length
   
   multiNamespaceMode: false
   
   podTemplate:
   
   # Git sync
   dags:
     persistence:
       # Enable persistent volume for storing dags
       enabled: false
       # Volume size for dags
       size: 1Gi
       # If using a custom storageClass, pass name here
       storageClassName: gp2
       # access mode of the persistent volume
       accessMode: ReadWriteMany
       ## the name of an existing PVC to use
       existingClaim: "airflow-dags"
     gitSync:
       enabled: true
       repo: [email protected]:Tikna-inc/airflow.git
       branch: main
       rev: HEAD
       root: "/git"
       dest: "repo"
       depth: 1
       maxFailures: 0
       subPath: ""
       sshKeySecret: airflow-ssh-secret
       wait: 60
       containerName: git-sync
       uid: 65533
   ```
   
   **and this is the dag with its tasks**
   
   ```
   
   
   from datetime import timedelta
   
   import requests
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.utils.dates import days_ago
   
   logging.getLogger().setLevel(level=logging.INFO)
   
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['[email protected]'],
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 1,
       'retry_delay': timedelta(minutes=5),
   }
   
   
   def get_active_customers():
       requests.get("localhost:8080")
   
   
   dag = DAG(
       'bash',
       default_args=default_args,
       description='A simple test DAG',
       schedule_interval='*/2 * * * *',
       start_date=days_ago(1),
       tags=['Test'],
       is_paused_upon_creation=False,
       catchup=False
   )
   
   t1 = BashOperator(
       task_id='print_date',
       bash_command='mkdir ./itsMe',
       dag=dag
   )
   
   t1
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to