akki commented on a change in pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#discussion_r439729529



##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -243,30 +245,39 @@ def _run_image(self):
                 tty=self.tty,
             )
 
-            lines = self.cli.attach(container=self.container['Id'],
-                                    stdout=True,
-                                    stderr=True,
-                                    stream=True)
+            self.cli.start(self.container["Id"])
 
-            self.cli.start(self.container['Id'])
+            result = self.cli.wait(self.container["Id"])
+
+            lines = [
+                templ.decode("utf-8").strip() if hasattr(templ, "decode") else 
str(templ)
+                for templ in self.cli.logs(
+                    self.container["Id"], stream=True, stdout=True, 
stderr=False
+                )
+            ]
 
             line = ''
             for line in lines:
-                line = line.strip()
-                if hasattr(line, 'decode'):
-                    line = line.decode('utf-8')
                 self.log.info(line)
 
-            result = self.cli.wait(self.container['Id'])
-            if result['StatusCode'] != 0:
-                raise AirflowException('docker container failed: ' + 
repr(result))
+            warnings = [
+                tempw.decode("utf-8").strip() if hasattr(tempw, "decode") else 
str(tempw)
+                for tempw in self.cli.logs(
+                    self.container["Id"], stream=True, stdout=False, 
stderr=True
+                )
+            ]
+
+            for warning in warnings:
+                self.log.warning(warning)
+
+            if result["StatusCode"] != 0:
+                raise AirflowException("docker container failed: " + 
repr(result))
 
-            # duplicated conditional logic because of expensive operation
             ret = None
             if self.do_xcom_push:
-                ret = self.cli.logs(container=self.container['Id']) \
-                    if self.xcom_all else line.encode('utf-8')
+                ret = "\n".join(lines) if self.xcom_all else line
 
+            self.cli.stop(self.container['Id'])

Review comment:
       I have a couple of questions here:
     1. Why does Airflow explicitly need to stop the Docker container?
     2. If I understand correctly the [`.wait()` 
API](https://docker-py.readthedocs.io/en/stable/api.html#docker.api.container.ContainerApiMixin.wait)
 (which we are already calling above) ensures that the container is stopped by 
blocking until the container gets stopped so why would the container be running 
at this point of time (hence why the need for this call?).
   
   If there is some reason that justifies this call, I would suggest adding 
some (warn?) logging here telling the user that the container was forcefully 
stopped by Airflow, so that it is easy for them to debug.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to