nullhack opened a new pull request #9464:
URL: https://github.com/apache/airflow/pull/9464


   Solves the issue https://github.com/apache/airflow/issues/9164
   This continues the discussion of https://github.com/apache/airflow/pull/9165
   
   **Files changed**: 
   `airflow/providers/docker/operators/docker.py`
   `airflow/providers/docker/example_dags/example_docker_xcom.py`
   
   **Problems**
   
   * **Issue 1**: When `xcom_push=True` is enabled (and `xcom_push_all=False`), 
the output **sometimes is null** OR captured in wrongly.
   * **Issue 2**: When `xcom_push_all=True` a bytes string (`b'...'`) is stored 
as xcom, It's harder to use the output on following operators and do not 
conform with other operators.
   * **Issue 3**: `Stderr` and `stdout` are written to the same output xcom. In 
practice, we don't want warnings and errors messing up with the code to be 
parsed on following operators (But we need to capture the output on airflow 
logs). Sending `stderr` to xcom can lead to undefined/non-deterministic 
behavior.
   
   **Solutions**:
   
   * **Issue 1**:  refactored the generator, using logs
   
   ```python
               def gen_output(stdout=False, stderr=False):
                   return (
                       templ.decode("utf-8").strip()
                       if hasattr(templ, "decode")
                       else str(templ)
                       for templ in self.cli.logs(
                           self.container["Id"], stream=True, stdout=stdout, 
stderr=stderr
                       )
                   )
   
               for line in gen_output(stdout=True, stderr=True):
                   self.log.info(line)
   ```
   
   * **Issue 2**:  I just added:
   ```python
   ...
                       templ.decode("utf-8").strip()
                       if hasattr(templ, "decode")
                       else str(templ)
   ...
   ```
   
   * **Issue 3**:  I store `stdout` into xcom:
   ```python
              return_value = None
               if self.do_xcom_push:
                   lines = gen_output(stdout=True)
                   if self.xcom_all:
                       return_value = "\n".join(lines)
                   else:
                       line_deque = deque(lines, maxlen=1)
                       return_value = line_deque.pop()
   ```
   In the end, only logs are send to xcom and if any error is raised, the DAG 
run fail after finishing:
   ```python
               result = self.cli.wait(self.container["Id"])
               if result["StatusCode"] != 0:
                   raise AirflowException("docker container failed: " + 
repr(result))
   ```
   
   **Sample Output**
   
   * **Issue 1**:
     * Captured correctly:
   
![image](https://user-images.githubusercontent.com/11466701/85225181-db928100-b401-11ea-92a7-5a10d8f30ad6.png)
   
   `We would expect only the last line '9'`
   * **Issue 2**:
     * Easier to pull outputs and use on other operators
   
![image](https://user-images.githubusercontent.com/11466701/85225198-07156b80-b402-11ea-91eb-5e63ab874871.png)
   
   
   * **Issue 3**:
     * `Stderr` do not mess up with the output and has deterministic behavior
   
![image](https://user-images.githubusercontent.com/11466701/85225301-c0744100-b402-11ea-9393-0ec0bd75a9fa.png)
   
   ---
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes) 
`unnecessary`
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions. 
`unnecessary`
   - [X] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to