phi-friday commented on issue #35263:
URL: https://github.com/apache/airflow/issues/35263#issuecomment-1784896325
# update
## How to reproduce
### docker-compose.yaml
```yaml
---
version: '3.8'
x-airflow-common:
&airflow-common
image: apache/airflow:2.7.2-python3.11
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN:
postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__SQL_ALCHEMY_CONN:
postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND:
db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKENDS:
'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./config:/opt/airflow/config
- ./plugins:/opt/airflow/plugins
user: ${AIRFLOW_UID}:0
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
redis:
image: redis:latest
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 30s
retries: 50
start_period: 30s
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
# yamllint disable rule:line-length
test:
- "CMD-SHELL"
- 'celery --app
airflow.providers.celery.executors.celery_executor.app inspect ping -d
"celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app
inspect ping -d "celery@$${HOSTNAME}"'
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
environment:
<<: *airflow-common-env
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob
--hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu
airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable
)); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version
$${airflow_version}!\e[0m"
echo "The minimum Airflow version supported:
$${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) /
one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for
Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec
$$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for
Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for
Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec
$$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to
run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of
resources available:"
echo "
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: airflow
_AIRFLOW_WWW_USER_PASSWORD: airflow
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- .:/sources
docker-socket-proxy:
image: tecnativa/docker-socket-proxy:0.1.1
container_name: airflow-socket
environment:
CONTAINERS: 1
IMAGES: 1
AUTH: 1
POST: 1
privileged: true
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
restart: always
volumes:
postgres-db-volume:
```
### dags/test_dag.py
```python
# test_dag.py
from __future__ import annotations
from os import environ
from airflow.decorators import dag, task
from pendulum.datetime import DateTime
from pendulum.tz import local_timezone
DEFAULT_ARGS = {
"image": "python:3.11-slim-bullseye",
"api_version": "auto",
"docker_url": "TCP://docker-socket-proxy:2375",
"auto_remove": "force",
"mount_tmp_dir": False,
"container_name": "pickle_error_test",
"user": environ["AIRFLOW_UID"],
}
@task.python()
def no_error() -> None:
import logging
logger = logging.getLogger("airflow.task")
logger.info("in celery")
@task.docker()
def pickle_error() -> None:
import logging
logger = logging.getLogger("airflow.task")
logger.info("in docker")
@dag(
start_date=DateTime.now(local_timezone()).replace(
hour=0, minute=0, second=0, microsecond=0
),
schedule=None,
default_args=DEFAULT_ARGS | {"do_xcom_push": False},
catchup=False,
)
def test_docker_task_error() -> None:
in_celery = no_error()
in_docker = pickle_error()
# Removing the following line, no error occurs.
_ = in_celery >> in_docker
test_docker_task_error()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]