This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 615aa18  [AIRFLOW-4363] Fix JSON encoding error (#8287)
615aa18 is described below

commit 615aa18859e529039a2116204be7a877cc493ee3
Author: retornam <retor...@users.noreply.github.com>
AuthorDate: Fri May 1 13:22:10 2020 -0700

    [AIRFLOW-4363] Fix JSON encoding error (#8287)
    
    (cherry picked from commit 511d98e30ded2bcce9d246b358f806cea45ebcb7)
---
 airflow/operators/docker_operator.py    | 10 ++++++----
 tests/operators/test_docker_operator.py |  6 ++++--
 2 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/airflow/operators/docker_operator.py 
b/airflow/operators/docker_operator.py
index 85c17d8..d0a872a 100644
--- a/airflow/operators/docker_operator.py
+++ b/airflow/operators/docker_operator.py
@@ -19,9 +19,10 @@
 """
 Implements Docker operator
 """
-import json
 
 import ast
+
+import six
 from docker import APIClient, tls
 
 from airflow.hooks.docker_hook import DockerHook
@@ -265,9 +266,10 @@ class DockerOperator(BaseOperator):
         # Pull the docker image if `force_pull` is set or image does not exist 
locally
         if self.force_pull or len(self.cli.images(name=self.image)) == 0:
             self.log.info('Pulling docker image %s', self.image)
-            for line in self.cli.pull(self.image, stream=True, decode=True):
-                output = json.loads(line.decode('utf-8').strip())
-                if 'status' in output:
+            for output in self.cli.pull(self.image, stream=True, decode=True):
+                if isinstance(output, six.string_types):
+                    self.log.info("%s", output)
+                if isinstance(output, dict) and 'status' in output:
                     self.log.info("%s", output['status'])
 
         self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir
diff --git a/tests/operators/test_docker_operator.py 
b/tests/operators/test_docker_operator.py
index ca4709a..aae5593 100644
--- a/tests/operators/test_docker_operator.py
+++ b/tests/operators/test_docker_operator.py
@@ -44,7 +44,7 @@ class DockerOperatorTestCase(unittest.TestCase):
         client_mock.images.return_value = []
         client_mock.attach.return_value = ['container log']
         client_mock.logs.return_value = ['container log']
-        client_mock.pull.return_value = [b'{"status":"pull log"}']
+        client_mock.pull.return_value = {"status": "pull log"}
         client_mock.wait.return_value = {"StatusCode": 0}
 
         client_class_mock.return_value = client_mock
@@ -88,6 +88,8 @@ class DockerOperatorTestCase(unittest.TestCase):
         client_mock.pull.assert_called_with('ubuntu:latest', stream=True,
                                             decode=True)
         client_mock.wait.assert_called_with('some_id')
+        self.assertEqual(operator.cli.pull('ubuntu:latest', stream=True, 
decode=True),
+                         client_mock.pull.return_value)
 
     @mock.patch('airflow.operators.docker_operator.tls.TLSConfig')
     @mock.patch('airflow.operators.docker_operator.APIClient')
@@ -250,7 +252,7 @@ class DockerOperatorTestCase(unittest.TestCase):
         client_mock.images.return_value = []
         client_mock.create_container.return_value = {'Id': 'some_id'}
         client_mock.attach.return_value = ['container log']
-        client_mock.pull.return_value = [b'{"status":"pull log"}']
+        client_mock.pull.return_value = {"status": "pull log"}
         client_mock.wait.return_value = {"StatusCode": 0}
 
         client_class_mock.return_value = client_mock

Reply via email to