phi-friday opened a new issue, #35289: URL: https://github.com/apache/airflow/issues/35289
### Apache Airflow version 2.7.2 ### What happened https://github.com/apache/airflow/discussions/35285 The error occurs when copying the `__dict__` attribute of `_DockerDecoratedOperator`, which in turn copies the `pickling_library` attribute. https://github.com/apache/airflow/blob/main/airflow/providers/docker/decorators/docker.py#L92 In this script, the error occurs because the `pickle` module is declared, so we need to change the `pickling_library` to a serializable and lazy load it. ### What you think should happen instead _No response_ ### How to reproduce docker-compose.yaml ```yaml --- version: '3.8' x-airflow-common: &airflow-common image: apache/airflow:2.7.2-python3.11 environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'false' AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./config:/opt/airflow/config - ./plugins:/opt/airflow/plugins user: ${AIRFLOW_UID}:0 depends_on: &airflow-common-depends-on redis: condition: service_healthy postgres: condition: service_healthy services: postgres: image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-db-volume:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] interval: 10s retries: 5 start_period: 5s restart: always redis: image: redis:latest expose: - 6379 healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 30s retries: 50 start_period: 30s restart: always airflow-webserver: <<: *airflow-common command: webserver ports: - "8080:8080" healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully airflow-scheduler: <<: *airflow-common command: scheduler healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully airflow-worker: <<: *airflow-common command: celery worker healthcheck: # yamllint disable rule:line-length test: - "CMD-SHELL" - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' interval: 30s timeout: 10s retries: 5 start_period: 30s environment: <<: *airflow-common-env DUMB_INIT_SETSID: "0" restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully airflow-triggerer: <<: *airflow-common command: triggerer healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully airflow-init: <<: *airflow-common entrypoint: /bin/bash # yamllint disable rule:line-length command: - -c - | function ver() { printf "%04d%04d%04d%04d" $${1//./ } } airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version) airflow_version_comparable=$$(ver $${airflow_version}) min_airflow_version=2.2.0 min_airflow_version_comparable=$$(ver $${min_airflow_version}) if (( airflow_version_comparable < min_airflow_version_comparable )); then echo echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m" echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!" echo exit 1 fi one_meg=1048576 mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) disk_available=$$(df / | tail -1 | awk '{print $$4}') warning_resources="false" if (( mem_available < 4000 )) ; then echo echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" echo warning_resources="true" fi if (( cpus_available < 2 )); then echo echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" echo "At least 2 CPUs recommended. You have $${cpus_available}" echo warning_resources="true" fi if (( disk_available < one_meg * 10 )); then echo echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" echo warning_resources="true" fi if [[ $${warning_resources} == "true" ]]; then echo echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" echo "Please follow the instructions to increase amount of resources available:" echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" echo fi mkdir -p /sources/logs /sources/dags /sources/plugins chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} exec /entrypoint airflow version # yamllint enable rule:line-length environment: <<: *airflow-common-env _AIRFLOW_DB_MIGRATE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: airflow _AIRFLOW_WWW_USER_PASSWORD: airflow _PIP_ADDITIONAL_REQUIREMENTS: '' user: "0:0" volumes: - .:/sources docker-socket-proxy: image: tecnativa/docker-socket-proxy:0.1.1 container_name: airflow-socket environment: CONTAINERS: 1 IMAGES: 1 AUTH: 1 POST: 1 privileged: true volumes: - /var/run/docker.sock:/var/run/docker.sock:ro restart: always volumes: postgres-db-volume: ``` dag_test.py ```python # test_dag.py from __future__ import annotations from os import environ from airflow.decorators import dag, task from pendulum.datetime import DateTime from pendulum.tz import local_timezone DEFAULT_ARGS = { "image": "python:3.11-slim-bullseye", "api_version": "auto", "docker_url": "TCP://docker-socket-proxy:2375", "auto_remove": "force", "mount_tmp_dir": False, "container_name": "pickle_error_test", "user": environ["AIRFLOW_UID"], } @task.python() def no_error() -> None: import logging logger = logging.getLogger("airflow.task") logger.info("in celery") @task.docker() def pickle_error() -> None: import logging logger = logging.getLogger("airflow.task") logger.info("in docker") @dag( start_date=DateTime.now(local_timezone()).replace( hour=0, minute=0, second=0, microsecond=0 ), schedule=None, default_args=DEFAULT_ARGS | {"do_xcom_push": False}, catchup=False, ) def test_docker_task_error() -> None: in_celery = no_error() in_docker = pickle_error() # Removing the following line, no error occurs. _ = in_celery >> in_docker test_docker_task_error() ``` ### Operating System Ubuntu 22.04.1 LTS ### Versions of Apache Airflow Providers _No response_ ### Deployment Docker-Compose ### Deployment details _No response_ ### Anything else I've never used `breeze` before, so I'm having trouble creating a `pr` now. I'll check it out in a few hours and try to create a `pr` if I can figure it out. ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
