nullhack opened a new pull request #9165:
URL: https://github.com/apache/airflow/pull/9165


   Solves the issue https://github.com/apache/airflow/issues/9164
   
   **File changed**: `airflow/providers/docker/operators/docker.py`
   
   **Problems**
   
   * **Issue 1**: When `xcom_push=True` is enabled (and `xcom_push_all=False`), 
the output **sometimes is null** OR captured in wrongly.
   * **Issue 2**: When `xcom_push_all=True` a bytes string (`b'...'`) is stored 
as xcom, It's harder to use the output on following operators and do not 
conform with other operators.
   * **Issue 3**: `Stderr` and `stdout` are written to the same output xcom. In 
practice, we don't want warnings and errors messing up with the code to be 
parsed on following operators (But we need to capture the output on airflow 
logs). Sending `stderr` to xcom can lead to undefined/non-deterministic 
behavior.
   
   **Solutions**:
   
   * **Issue 1**: The issue here is that the 
[attach](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L241)
 starts before the 
[wait](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L255).
 But seems there's an issue if we try to run `wait` before `attach` using 
`stream=True` that freezes and don't return any output (I did not dig deep, but 
probably will become an issue on `docker python SDK` later). A simple solution 
is to run `logs` as It's a [wrapper for 
attach](https://github.com/docker/docker-py/blob/master/docker/api/container.py#L22).
 I followed this way and refactored part of the code, instead of two calls I 
saved the output on a list after `wait`:
   
   ```python
               self.cli.start(self.container["Id"])
   
               result = self.cli.wait(self.container["Id"])
   
               lines = [
                   l.decode("utf-8").strip() if hasattr(l, "decode") else str(l)
                   for l in self.cli.logs(
                       self.container["Id"], stream=True, stdout=True, 
stderr=False
                   )
               ]
   ```
   Why a list, not a generator?
   Because I might use two times, a generator can be used only once per request:
   ```python
               for line in lines:
                   self.log.info(line)
      
               ...
   
               if self.xcom_push_flag:
                   return "\n".join(lines) if self.xcom_all else line
   ```
   So I decided to just keep as a list to make the code less complex. And do 
not repeat the request as the original code.
   * **Issue 2**:  I just added:
   ```python
   ...
   l.decode("utf-8").strip() if hasattr(l, "decode") else str(l)
   ...
   ```
   This will ensure that every line will be decoded if possible, if not, then 
return the string representation of It (DockerOperator always return bytes 
string, but if in the future other objects are possible, this keep It 
consistent against errors).
   **I expect that this part can cause some discussion**, because I'm changing 
the output from `bytes` to `string` (my personal opinion is that being `bytes` 
is an issue and should be changed to `string` as BashOperator). There's no 
documentation about what should be the value stored as xcom from 
DockerOperator, but If this is a big issue I suggest creating a flag 
(`xcom_string=False`), but personally I don't like this approach.
   * **Issue 3**:  I created a separated list that only stores `stderr`:
   ```python
               warnings = [
                   w.decode("utf-8").strip() if hasattr(w, "decode") else str(w)
                   for w in self.cli.logs(
                       self.container["Id"], stream=True, stdout=False, 
stderr=True
                   )
               ]
   
               for warning in warnings:
                   self.log.warning(warning)
   ```
   In the end, only logs are send to xcom and if any error is raised, the DAG 
run fail:
   ```python
               if result["StatusCode"] != 0:
                   raise AirflowException("docker container failed: " + 
repr(result))
   
               if self.xcom_push_flag:
                   return "\n".join(lines) if self.xcom_all else line
   ```
   
   **Sample Output**
   
   * **Issue 1**:
     * Captured correctly:
   
![image](https://user-images.githubusercontent.com/11466701/83961479-1060ed00-a8c6-11ea-9fb8-860c8b60107b.png)
   `We would expect only the last line '9'`
   * **Issue 2**:
     * Conform with similar operators
   
![image](https://user-images.githubusercontent.com/11466701/83961584-1e633d80-a8c7-11ea-9fd6-783b26ffa1a5.png)
   
     * Easier to pull outputs and use on other operators
   
![image](https://user-images.githubusercontent.com/11466701/83961601-3dfa6600-a8c7-11ea-805c-a2471887eaac.png)
   
   
   * **Issue 3**:
     * `stderr` do not mess up with the output
   
![image](https://user-images.githubusercontent.com/11466701/83961626-89ad0f80-a8c7-11ea-9cba-bd64f81fe34a.png)
   `Note that new lines are not shown on UI, but if used on some operator the 
output will be correct`
     * deterministic behavior
   
![image](https://user-images.githubusercontent.com/11466701/83961661-ead4e300-a8c7-11ea-8b7e-59a95eb57c92.png)
   
   ---
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes) 
`unnecessary`
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions. 
`unnecessary`
   - [X] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to