nullhack opened a new issue #9164:
URL: https://github.com/apache/airflow/issues/9164


   
   **Apache Airflow version**: 1.10.10
   
   **Environment**: apache-airflow==1.10.10, docker==4.2.1
   
   **Problems**
   
   * **Issue 1**: When `xcom_push=True` is enabled (and `xcom_push_all=False`), 
the output **sometimes is null** OR captured in wrongly (`see screenshots 
bellow`).
   * **Issue 2**: When `xcom_push_all=True` a bytes string (`b'...'`) is stored 
as xcom, It's harder to use the output on following operators and do not 
conform with other operators (e.g. BashOperator that writes output as string to 
xcom (`see screenshots bellow`)).
   * **Issue 3**: `Stderr` and `stdout` are written to the same output xcom. In 
practice, we don't want warnings and errors messing up with the code to be 
parsed on following operators (But we need to capture the output on airflow 
logs). Sending `stderr` to xcom can lead to undefined/non-deterministic 
behavior (`see screenshots bellow`).
   
   **What you expected to happen**:
   
   * **Issue 1**: I expect (as per 
[documentation](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L107))
 If `xcom_push=True` and `xcom_push_all=False`, (only) the last line of logs 
will be pushed to xcom.
   * **Issue 2**: If `xcom_push=True` and `xcom_push_all=True`, I expect It to 
return all log lines to xcom. Right now DockerOperator encode back the data [to 
bytes](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L262).
 But this is not a good standard because requires extra transformation to be 
used by next operators (`see screenshots bellow`).
   * **Issue 3**: If warnings/errors are returned, I expect airflow to log It, 
but not mess with xcom (`see screenshots bellow`).
   
   **Testing code**
   
   `Airflow DAG to test xcom push and push_all`
   
   ```python
   from airflow import DAG
   from airflow.utils.dates import days_ago
   from datetime import timedelta
   from airflow.operators.docker_operator import DockerOperator
   from airflow.operators.python_operator import PythonOperator
   from airflow.operators.bash_operator import BashOperator
   
   params = {
       'dag_id': 'test_xcom',
       'schedule_interval': '@daily',
       'catchup': True,
       'max_active_runs': 2,
       'default_args': {
           'owner': 'airflow',
           'start_date': days_ago(5),
           'retries': 2,
           'retry_delay': timedelta(minutes=5)
       }
   }
   
   with DAG(**params) as dag:
   
       write_xcom_docker_warning = DockerOperator(
           task_id='write_xcom_docker_warning',
           image='python:3-slim',
           api_version='auto',
           command="""python -c "import logging;[print(i) for i in 
range(10)];logging.warning('this is a warning')" """,
           xcom_push=True,
           xcom_all=False,
           network_mode='bridge')
   
       write_xcom_docker_all = DockerOperator(
           task_id='write_xcom_docker_all',
           image='python:3-slim',
           api_version='auto',
           command="""python -c "import logging;[print(i) for i in 
range(10)];logging.warning('this is a warning')" """,
           xcom_push=True,
           xcom_all=True,
           network_mode='bridge')
   
       write_xcom_docker_pull = DockerOperator(
           task_id='write_xcom_docker_pull',
           image='ubuntu:20.04',
           api_version='auto',
           command="""echo {{ 
ti.xcom_pull(task_ids="write_xcom_docker_warning") }}""",
           xcom_push=True,
           xcom_all=False,
           network_mode='bridge')
   
       write_xcom_bash_pull = BashOperator(
           task_id="write_xcom_bash_pull",
           bash_command="""echo {{ 
ti.xcom_pull(task_ids="write_xcom_docker_warning") }}""",
           xcom_push=True
       )
   
       type_data = PythonOperator(
           task_id="type_data",
           provide_context=True,
           python_callable=lambda **args: (
               'docker_xcom: '+
               
type(args['task_instance'].xcom_pull(task_ids='write_xcom_docker_pull')).__name__+
               ' | bash_xcom: '+
               
type(args['task_instance'].xcom_pull(task_ids='write_xcom_bash_pull')).__name__
               ),
           xcom_push=True
       )
   
       write_xcom_docker_warning >> write_xcom_docker_all >> 
[write_xcom_docker_pull, write_xcom_bash_pull] >> type_data
   
   ```
   
   `A minimal code to generate null outputs from DockerOperator`
   
   On normal runs It's hard sometimes to see the `issue 1 - sometimes returning 
null strings`. The code bellow is a minimal working code that emulates the 
sequence of steps performed by [DockerOperator 
(`_run_image`)](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L246),
 but because of the size is easier to generate null strings.
   
   ```python
   from docker import APIClient
   d = APIClient() 
   c = d.create_container(image='ubuntu:20.04', name='TEST', command="""bash -c 
"echo 'test' && echo 'test2'" """, 
host_config=d.create_host_config(auto_remove=False, network_mode='bridge'))
   d.start(c['Id'])
   print( [i for i in d.attach(c['Id'], stderr=True, stdout=True, stream=True)] 
)
   d.wait(c['Id']) 
   print([i for i in d.logs(c['Id'], stream=True, stdout=True, stderr=True)])
   d.remove_container(c['Id'])
   ```
   
   Note that the 
[wait](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L255)
 appears after the 
[attach](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L241)
   
   **Sample Output**
   
   * **Issue 1**:
     * Null returns:
   
![image](https://user-images.githubusercontent.com/11466701/83960062-01267300-a8b7-11ea-8f6e-ab1030cccaee.png)
     * Captured wrongly:
   
![image](https://user-images.githubusercontent.com/11466701/83960114-7db95180-a8b7-11ea-8e7e-75528d0f56e2.png)
   `We would expect only the last line '9'`
   * **Issue 2**:
     * Do not conform with similar operators
   
![image](https://user-images.githubusercontent.com/11466701/83960173-413a2580-a8b8-11ea-9601-82297ac0537c.png)
     * Harder to work with pull and bytes output
   
![image](https://user-images.githubusercontent.com/11466701/83960203-9f670880-a8b8-11ea-839d-6b70914e1466.png)
   * **Issue 3**:
     * `stderr` mess up with the output
   
![image](https://user-images.githubusercontent.com/11466701/83960249-25834f00-a8b9-11ea-84a1-2d8b1e32d984.png)
     * non-deterministic behavior
   
![image](https://user-images.githubusercontent.com/11466701/83960257-3fbd2d00-a8b9-11ea-8524-2cb1d59541ba.png)
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to