nullhack opened a new pull request #9165: URL: https://github.com/apache/airflow/pull/9165
Solves the issue https://github.com/apache/airflow/issues/9164 **File changed**: `airflow/providers/docker/operators/docker.py` **Problems** * **Issue 1**: When `xcom_push=True` is enabled (and `xcom_push_all=False`), the output **sometimes is null** OR captured in wrongly. * **Issue 2**: When `xcom_push_all=True` a bytes string (`b'...'`) is stored as xcom, It's harder to use the output on following operators and do not conform with other operators. * **Issue 3**: `Stderr` and `stdout` are written to the same output xcom. In practice, we don't want warnings and errors messing up with the code to be parsed on following operators (But we need to capture the output on airflow logs). Sending `stderr` to xcom can lead to undefined/non-deterministic behavior. **Solutions**: * **Issue 1**: The issue here is that the [attach](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L241) starts before the [wait](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L255). But seems there's an issue if we try to run `wait` before `attach` using `stream=True` that freezes and don't return any output (I did not dig deep, but probably will become an issue on `docker python SDK` later). A simple solution is to run `logs` as It's a [wrapper for attach](https://github.com/docker/docker-py/blob/master/docker/api/container.py#L22). I followed this way and refactored part of the code, instead of two calls I saved the output on a list after `wait`: ```python self.cli.start(self.container["Id"]) result = self.cli.wait(self.container["Id"]) lines = [ l.decode("utf-8").strip() if hasattr(l, "decode") else str(l) for l in self.cli.logs( self.container["Id"], stream=True, stdout=True, stderr=False ) ] ``` Why a list, not a generator? Because I might use two times, a generator can be used only once per request: ```python for line in lines: self.log.info(line) ... if self.xcom_push_flag: return "\n".join(lines) if self.xcom_all else line ``` So I decided to just keep as a list to make the code less complex. And do not repeat the request as the original code. * **Issue 2**: I just added: ```python ... l.decode("utf-8").strip() if hasattr(l, "decode") else str(l) ... ``` This will ensure that every line will be decoded if possible, if not, then return the string representation of It (DockerOperator always return bytes string, but if in the future other objects are possible, this keep It consistent against errors). **I expect that this part can cause some discussion**, because I'm changing the output from `bytes` to `string` (my personal opinion is that being `bytes` is an issue and should be changed to `string` as BashOperator). There's no documentation about what should be the value stored as xcom from DockerOperator, but If this is a big issue I suggest creating a flag (`xcom_string=False`), but personally I don't like this approach. * **Issue 3**: I created a separated list that only stores `stderr`: ```python warnings = [ w.decode("utf-8").strip() if hasattr(w, "decode") else str(w) for w in self.cli.logs( self.container["Id"], stream=True, stdout=False, stderr=True ) ] for warning in warnings: self.log.warning(warning) ``` In the end, only logs are send to xcom and if any error is raised, the DAG run fail: ```python if result["StatusCode"] != 0: raise AirflowException("docker container failed: " + repr(result)) if self.xcom_push_flag: return "\n".join(lines) if self.xcom_all else line ``` **Sample Output** * **Issue 1**: * Captured correctly:  `We would expect only the last line '9'` * **Issue 2**: * Conform with similar operators  * Easier to pull outputs and use on other operators  * **Issue 3**: * `stderr` do not mess up with the output  `Note that new lines are not shown on UI, but if used on some operator the output will be correct` * deterministic behavior  --- - [X] Description above provides context of the change - [X] Unit tests coverage for changes (not needed for documentation changes) `unnecessary` - [X] Target Github ISSUE in description if exists - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [X] Relevant documentation is updated including usage instructions. `unnecessary` - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
