GitHub user m-fariz-a created a discussion: Docker decorator push xcom error
I am new using docker decorator in airflow for isolation. For some reason, my
team decided to go environment isolation using docker container for more
portability because out airflow also run inside container. I am not really sure
how to fix this error, but I suspect this error occured when container task is
pushing value to xcom. This error is not always happen but I'm not really sure
what condition really trigger this situation.
This error happenned when executing task get_data_survey()
```
[2026-02-05 09:15:26] INFO - DAG bundles loaded: data-engineer-2025,
projek_engineer
[2026-02-05 09:15:26] INFO - Filling up the DagBag from
/app/projek_engineer/airflow-dags/dag-etl/myproject2025.py
[2026-02-05 09:15:32] INFO - Starting docker container from image
localhost/projek_engineer-myproject2025:latest
[2026-02-05 09:15:32] INFO - + python3 -c 'import base64, os;x =
base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py",
"wb"); f.write(x);'
[2026-02-05 09:15:32] INFO - + python3 -c 'import base64, os;x =
base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in",
"wb"); f.write(x);'
[2026-02-05 09:15:32] INFO - + python3 /tmp/script.py /tmp/script.in
/tmp/script.out none /tmp/script.out
[2026-02-05 09:16:17] INFO - downloading metadata myplatform-myproject2025
[2026-02-05 09:16:17] INFO - downloading dataset myplatform-myproject2025
offset=425
[2026-02-05 09:16:17] INFO - downloading dataset myplatform-myproject2025
offset=426
[2026-02-05 09:16:19] ERROR - Task failed with exception
ReadError: file could not be opened successfully:
- method gz: ReadError('empty file')
- method bz2: ReadError('not a bzip2 file')
- method xz: ReadError('not an lzma file')
- method tar: ReadError('empty file')
```
This is the log if there's no error
```
[2026-02-05 09:18:26] INFO - DAG bundles loaded: data-engineer-2025,
projek_engineer
[2026-02-05 09:18:26] INFO - Filling up the DagBag from
/app/projek_engineer/airflow-dags/dag-etl/myproject2025.py
[2026-02-05 09:18:32] INFO - Starting docker container from image
localhost/projek_engineer-myproject2025:latest
[2026-02-05 09:18:32] INFO - + python3 -c 'import base64, os;x =
base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py",
"wb"); f.write(x);'
[2026-02-05 09:18:32] INFO - + python3 -c 'import base64, os;x =
base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in",
"wb"); f.write(x);'
[2026-02-05 09:18:32] INFO - + python3 /tmp/script.py /tmp/script.in
/tmp/script.out none /tmp/script.out
[2026-02-05 09:19:17] INFO - downloading metadata myplatform-myproject2025
[2026-02-05 09:19:17] INFO - downloading dataset myplatform-myproject2025
offset=425
[2026-02-05 09:19:17] INFO - downloading dataset myplatform-myproject2025
offset=426
[2026-02-05 09:19:20] INFO - Pushing xcom
ti=RuntimeTaskInstance(id=UUID('019c2af0-0af0-7e7b-bb66-5b74d4064511'),
task_id='get_data_survey', dag_id='myproject2025',
run_id='scheduled__2026-02-04T23:15:00+00:00', try_number=1,
dag_version_id=UUID('019c236b-da90-7cc2-b35e-5087fda54754'), map_index=0,
hostname='systemd-airflow', context_carrier={},
task=<Task(_DockerDecoratedOperator): get_data_survey>,
bundle_instance=LocalDagBundle(name=projek_engineer), max_tries=0,
start_date=datetime.datetime(2026, 2, 4, 23, 15, 24, 625965,
tzinfo=datetime.timezone.utc), end_date=None, state=<TaskInstanceState.RUNNING:
'running'>, is_mapped=True, rendered_map_index=None)
```
This is its dag code
```python
import pendulum, os, time
from dotenv import load_dotenv, dotenv_values
from docker.types import Mount
from airflow.sdk import dag, task, Asset
from airflow.exceptions import AirflowSkipException
# this is our custom notifier
from discord_notification import discord_notification
env_prod = '/app/projek_engineer/.env.prod'
env_custom = '/app/projek_engineer/.env'
load_dotenv(env_prod)
load_dotenv(env_custom, override=True)
docker_args = {
'container_name': "{{ dag.dag_id }}-{{ ti.task_id }}-{{
macros.datetime.utcnow().strftime('%Y%m%dT%H%M%S') }}-{{ ti.map_index }}",
'image': f'localhost/projek_engineer-myproject2025:latest',
'working_dir': '/app/pipeline',
"mounts": [
Mount(source=os.getenv('HOST_PROJECT_ROOT')+'/pipeline',
target='/app/pipeline', type="bind"),
Mount(source=os.getenv('HOST_PROJECT_ROOT')+'/airflow-dags/custom_module',
target='/app/airflow_libs/custom_module/custom_module',
type="bind"),
],
"mount_tmp_dir": True,
"environment": {
"PYTHONPATH": '/app/pipeline:/app/airflow_libs/custom_module',
**dotenv_values(env_prod),
**dotenv_values(env_custom),
},
"network_mode": os.getenv("DOCKER_NETWORK_MODE"),
"auto_remove": "success",
"docker_url": "unix://var/run/docker.sock",
"mem_limit": "5g"
}
# Definisi DAG dengan TaskFlow API
@dag(
dag_id="myproject2025",
start_date=pendulum.parse("2025-11-08T00:00:00+07:00"),
end_date=pendulum.parse("2026-02-28T00:00:00+07:00"),
schedule="15 6-18/3 * * *",
catchup=False,
max_active_runs=1,
tags=['survey', 'etl', '2025'],
default_args={
"owner": "data-engineer",
'on_failure_callback': discord_notification,
'skip_on_exit_code':99,
},
description="Processing download",
)
def get_data_process():
@task
def prepare_download_config():
from custom_module.config_reader import download_config
config_path="/app/projek_engineer/pipeline/klien/myproject2025/config/default.yaml"
return download_config(config_path)
@task.docker(**docker_args)
def check_status_survey(project_config):
import time
from custom_module.check_status_survey import check_status_survey
status_survey = check_status_survey(project_config)
time.sleep(2)
return status_survey
@task.docker(max_active_tis_per_dagrun=2, **docker_args)
def get_data_survey(survey_info):
import time
from custom_module.download_survey import download_survey
dsurvey = download_survey(survey_info, host_docker_endpoint=True)
time.sleep(2)
return dsurvey
download_config = prepare_download_config()
status_surveys = check_status_survey(project_config=download_config)
download_data = get_data_survey.expand(survey_info=status_surveys)
get_data_process()
```
As I mentioned earlier, the error is not always happens. Some time , the dag
run without any error, sometime error happen in `check_status_survey()`,
sometime happen in one of `get_data_survey()` dynamic task. I have tried to use
`mount_tmp_dir=False` or `auto_remove='never'` and error stil happen sometime.
The `check_status_survey()` has nested dictionary value to push, and
`get_data_survey()` has a simple dictionary value with 4 paired key-value(s).
By retrying the error task, sometime it resolves the error. Adding retry
mechanism also is not really assuring but worked sometime.
GitHub link: https://github.com/apache/airflow/discussions/61487
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]