phi-friday commented on issue #35263:
URL: https://github.com/apache/airflow/issues/35263#issuecomment-1784896325

   # update
   ## How to reproduce
   ### docker-compose.yaml
   ```yaml
   ---
   version: '3.8'
   x-airflow-common:
     &airflow-common
     image: apache/airflow:2.7.2-python3.11
     environment:
       &airflow-common-env
       AIRFLOW__CORE__EXECUTOR: CeleryExecutor
       AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:airflow@postgres/airflow
       AIRFLOW__CORE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:airflow@postgres/airflow
       AIRFLOW__CELERY__RESULT_BACKEND: 
db+postgresql://airflow:airflow@postgres/airflow
       AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
       AIRFLOW__CORE__FERNET_KEY: ''
       AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
       AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
       AIRFLOW__API__AUTH_BACKENDS: 
'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
       AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
     volumes:
       - ./dags:/opt/airflow/dags
       - ./logs:/opt/airflow/logs
       - ./config:/opt/airflow/config
       - ./plugins:/opt/airflow/plugins
     user: ${AIRFLOW_UID}:0
     depends_on:
       &airflow-common-depends-on
       redis:
         condition: service_healthy
       postgres:
         condition: service_healthy
   
   services:
     postgres:
       image: postgres:13
       environment:
         POSTGRES_USER: airflow
         POSTGRES_PASSWORD: airflow
         POSTGRES_DB: airflow
       volumes:
         - postgres-db-volume:/var/lib/postgresql/data
       healthcheck:
         test: ["CMD", "pg_isready", "-U", "airflow"]
         interval: 10s
         retries: 5
         start_period: 5s
       restart: always
   
     redis:
       image: redis:latest
       expose:
         - 6379
       healthcheck:
         test: ["CMD", "redis-cli", "ping"]
         interval: 10s
         timeout: 30s
         retries: 50
         start_period: 30s
       restart: always
   
     airflow-webserver:
       <<: *airflow-common
       command: webserver
       ports:
         - "8080:8080"
       healthcheck:
         test: ["CMD", "curl", "--fail", "http://localhost:8080/health";]
         interval: 30s
         timeout: 10s
         retries: 5
         start_period: 30s
       restart: always
       depends_on:
         <<: *airflow-common-depends-on
         airflow-init:
           condition: service_completed_successfully
   
     airflow-scheduler:
       <<: *airflow-common
       command: scheduler
       healthcheck:
         test: ["CMD", "curl", "--fail", "http://localhost:8974/health";]
         interval: 30s
         timeout: 10s
         retries: 5
         start_period: 30s
       restart: always
       depends_on:
         <<: *airflow-common-depends-on
         airflow-init:
           condition: service_completed_successfully
   
     airflow-worker:
       <<: *airflow-common
       command: celery worker
       healthcheck:
         # yamllint disable rule:line-length
         test:
           - "CMD-SHELL"
           - 'celery --app 
airflow.providers.celery.executors.celery_executor.app inspect ping -d 
"celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app 
inspect ping -d "celery@$${HOSTNAME}"'
         interval: 30s
         timeout: 10s
         retries: 5
         start_period: 30s
       environment:
         <<: *airflow-common-env
         DUMB_INIT_SETSID: "0"
       restart: always
       depends_on:
         <<: *airflow-common-depends-on
         airflow-init:
           condition: service_completed_successfully
   
     airflow-triggerer:
       <<: *airflow-common
       command: triggerer
       healthcheck:
         test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob 
--hostname "$${HOSTNAME}"']
         interval: 30s
         timeout: 10s
         retries: 5
         start_period: 30s
       restart: always
       depends_on:
         <<: *airflow-common-depends-on
         airflow-init:
           condition: service_completed_successfully
   
     airflow-init:
       <<: *airflow-common
       entrypoint: /bin/bash
       # yamllint disable rule:line-length
       command:
         - -c
         - |
           function ver() {
             printf "%04d%04d%04d%04d" $${1//./ }
           }
           airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu 
airflow airflow version)
           airflow_version_comparable=$$(ver $${airflow_version})
           min_airflow_version=2.2.0
           min_airflow_version_comparable=$$(ver $${min_airflow_version})
           if (( airflow_version_comparable < min_airflow_version_comparable 
)); then
             echo
             echo -e "\033[1;31mERROR!!!: Too old Airflow version 
$${airflow_version}!\e[0m"
             echo "The minimum Airflow version supported: 
$${min_airflow_version}. Only use this or higher!"
             echo
             exit 1
           fi
           one_meg=1048576
           mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / 
one_meg))
           cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
           disk_available=$$(df / | tail -1 | awk '{print $$4}')
           warning_resources="false"
           if (( mem_available < 4000 )) ; then
             echo
             echo -e "\033[1;33mWARNING!!!: Not enough memory available for 
Docker.\e[0m"
             echo "At least 4GB of memory required. You have $$(numfmt --to iec 
$$((mem_available * one_meg)))"
             echo
             warning_resources="true"
           fi
           if (( cpus_available < 2 )); then
             echo
             echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for 
Docker.\e[0m"
             echo "At least 2 CPUs recommended. You have $${cpus_available}"
             echo
             warning_resources="true"
           fi
           if (( disk_available < one_meg * 10 )); then
             echo
             echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for 
Docker.\e[0m"
             echo "At least 10 GBs recommended. You have $$(numfmt --to iec 
$$((disk_available * 1024 )))"
             echo
             warning_resources="true"
           fi
           if [[ $${warning_resources} == "true" ]]; then
             echo
             echo -e "\033[1;33mWARNING!!!: You have not enough resources to 
run Airflow (see above)!\e[0m"
             echo "Please follow the instructions to increase amount of 
resources available:"
             echo "   
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin";
             echo
           fi
           mkdir -p /sources/logs /sources/dags /sources/plugins
           chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
           exec /entrypoint airflow version
       # yamllint enable rule:line-length
       environment:
         <<: *airflow-common-env
         _AIRFLOW_DB_MIGRATE: 'true'
         _AIRFLOW_WWW_USER_CREATE: 'true'
         _AIRFLOW_WWW_USER_USERNAME: airflow
         _AIRFLOW_WWW_USER_PASSWORD: airflow
         _PIP_ADDITIONAL_REQUIREMENTS: ''
       user: "0:0"
       volumes:
         - .:/sources
   
     docker-socket-proxy:
       image: tecnativa/docker-socket-proxy:0.1.1
       container_name: airflow-socket
       environment:
         CONTAINERS: 1
         IMAGES: 1
         AUTH: 1
         POST: 1
       privileged: true
       volumes:
         - /var/run/docker.sock:/var/run/docker.sock:ro
       restart: always
   
   volumes:
     postgres-db-volume:
   
   ```
   
   ### dags/test_dag.py
   ```python
   # test_dag.py
   from __future__ import annotations
   
   from os import environ
   
   from airflow.decorators import dag, task
   from pendulum.datetime import DateTime
   from pendulum.tz import local_timezone
   
   DEFAULT_ARGS = {
       "image": "python:3.11-slim-bullseye",
       "api_version": "auto",
       "docker_url": "TCP://docker-socket-proxy:2375",
       "auto_remove": "force",
       "mount_tmp_dir": False,
       "container_name": "pickle_error_test",
       "user": environ["AIRFLOW_UID"],
   }
   
   
   @task.python()
   def no_error() -> None:
       import logging
   
       logger = logging.getLogger("airflow.task")
       logger.info("in celery")
   
   
   @task.docker()
   def pickle_error() -> None:
       import logging
   
       logger = logging.getLogger("airflow.task")
       logger.info("in docker")
   
   
   @dag(
       start_date=DateTime.now(local_timezone()).replace(
           hour=0, minute=0, second=0, microsecond=0
       ),
       schedule=None,
       default_args=DEFAULT_ARGS | {"do_xcom_push": False},
       catchup=False,
   )
   def test_docker_task_error() -> None:
       in_celery = no_error()
       in_docker = pickle_error()
       # Removing the following line, no error occurs.
       _ = in_celery >> in_docker
   
   
   test_docker_task_error()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to