nullhack opened a new issue #9164: URL: https://github.com/apache/airflow/issues/9164
**Apache Airflow version**: 1.10.10 **Environment**: apache-airflow==1.10.10, docker==4.2.1 **Problems** * **Issue 1**: When `xcom_push=True` is enabled (and `xcom_push_all=False`), the output **sometimes is null** OR captured in wrongly (`see screenshots bellow`). * **Issue 2**: When `xcom_push_all=True` a bytes string (`b'...'`) is stored as xcom, It's harder to use the output on following operators and do not conform with other operators (e.g. BashOperator that writes output as string to xcom (`see screenshots bellow`)). * **Issue 3**: `Stderr` and `stdout` are written to the same output xcom. In practice, we don't want warnings and errors messing up with the code to be parsed on following operators (But we need to capture the output on airflow logs). Sending `stderr` to xcom can lead to undefined/non-deterministic behavior (`see screenshots bellow`). **What you expected to happen**: * **Issue 1**: I expect (as per [documentation](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L107)) If `xcom_push=True` and `xcom_push_all=False`, (only) the last line of logs will be pushed to xcom. * **Issue 2**: If `xcom_push=True` and `xcom_push_all=True`, I expect It to return all log lines to xcom. Right now DockerOperator encode back the data [to bytes](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L262). But this is not a good standard because requires extra transformation to be used by next operators (`see screenshots bellow`). * **Issue 3**: If warnings/errors are returned, I expect airflow to log It, but not mess with xcom (`see screenshots bellow`). **Testing code** `Airflow DAG to test xcom push and push_all` ```python from airflow import DAG from airflow.utils.dates import days_ago from datetime import timedelta from airflow.operators.docker_operator import DockerOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.bash_operator import BashOperator params = { 'dag_id': 'test_xcom', 'schedule_interval': '@daily', 'catchup': True, 'max_active_runs': 2, 'default_args': { 'owner': 'airflow', 'start_date': days_ago(5), 'retries': 2, 'retry_delay': timedelta(minutes=5) } } with DAG(**params) as dag: write_xcom_docker_warning = DockerOperator( task_id='write_xcom_docker_warning', image='python:3-slim', api_version='auto', command="""python -c "import logging;[print(i) for i in range(10)];logging.warning('this is a warning')" """, xcom_push=True, xcom_all=False, network_mode='bridge') write_xcom_docker_all = DockerOperator( task_id='write_xcom_docker_all', image='python:3-slim', api_version='auto', command="""python -c "import logging;[print(i) for i in range(10)];logging.warning('this is a warning')" """, xcom_push=True, xcom_all=True, network_mode='bridge') write_xcom_docker_pull = DockerOperator( task_id='write_xcom_docker_pull', image='ubuntu:20.04', api_version='auto', command="""echo {{ ti.xcom_pull(task_ids="write_xcom_docker_warning") }}""", xcom_push=True, xcom_all=False, network_mode='bridge') write_xcom_bash_pull = BashOperator( task_id="write_xcom_bash_pull", bash_command="""echo {{ ti.xcom_pull(task_ids="write_xcom_docker_warning") }}""", xcom_push=True ) type_data = PythonOperator( task_id="type_data", provide_context=True, python_callable=lambda **args: ( 'docker_xcom: '+ type(args['task_instance'].xcom_pull(task_ids='write_xcom_docker_pull')).__name__+ ' | bash_xcom: '+ type(args['task_instance'].xcom_pull(task_ids='write_xcom_bash_pull')).__name__ ), xcom_push=True ) write_xcom_docker_warning >> write_xcom_docker_all >> [write_xcom_docker_pull, write_xcom_bash_pull] >> type_data ``` `A minimal code to generate null outputs from DockerOperator` On normal runs It's hard sometimes to see the `issue 1 - sometimes returning null strings`. The code bellow is a minimal working code that emulates the sequence of steps performed by [DockerOperator (`_run_image`)](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L246), but because of the size is easier to generate null strings. ```python from docker import APIClient d = APIClient() c = d.create_container(image='ubuntu:20.04', name='TEST', command="""bash -c "echo 'test' && echo 'test2'" """, host_config=d.create_host_config(auto_remove=False, network_mode='bridge')) d.start(c['Id']) print( [i for i in d.attach(c['Id'], stderr=True, stdout=True, stream=True)] ) d.wait(c['Id']) print([i for i in d.logs(c['Id'], stream=True, stdout=True, stderr=True)]) d.remove_container(c['Id']) ``` Note that the [wait](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L255) appears after the [attach](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L241) **Sample Output** * **Issue 1**: * Null returns:  * Captured wrongly:  `We would expect only the last line '9'` * **Issue 2**: * Do not conform with similar operators  * Harder to work with pull and bytes output  * **Issue 3**: * `stderr` mess up with the output  * non-deterministic behavior  ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
