This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push: new 615aa18 [AIRFLOW-4363] Fix JSON encoding error (#8287) 615aa18 is described below commit 615aa18859e529039a2116204be7a877cc493ee3 Author: retornam <retor...@users.noreply.github.com> AuthorDate: Fri May 1 13:22:10 2020 -0700 [AIRFLOW-4363] Fix JSON encoding error (#8287) (cherry picked from commit 511d98e30ded2bcce9d246b358f806cea45ebcb7) --- airflow/operators/docker_operator.py | 10 ++++++---- tests/operators/test_docker_operator.py | 6 ++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index 85c17d8..d0a872a 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -19,9 +19,10 @@ """ Implements Docker operator """ -import json import ast + +import six from docker import APIClient, tls from airflow.hooks.docker_hook import DockerHook @@ -265,9 +266,10 @@ class DockerOperator(BaseOperator): # Pull the docker image if `force_pull` is set or image does not exist locally if self.force_pull or len(self.cli.images(name=self.image)) == 0: self.log.info('Pulling docker image %s', self.image) - for line in self.cli.pull(self.image, stream=True, decode=True): - output = json.loads(line.decode('utf-8').strip()) - if 'status' in output: + for output in self.cli.pull(self.image, stream=True, decode=True): + if isinstance(output, six.string_types): + self.log.info("%s", output) + if isinstance(output, dict) and 'status' in output: self.log.info("%s", output['status']) self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir diff --git a/tests/operators/test_docker_operator.py b/tests/operators/test_docker_operator.py index ca4709a..aae5593 100644 --- a/tests/operators/test_docker_operator.py +++ b/tests/operators/test_docker_operator.py @@ -44,7 +44,7 @@ class DockerOperatorTestCase(unittest.TestCase): client_mock.images.return_value = [] client_mock.attach.return_value = ['container log'] client_mock.logs.return_value = ['container log'] - client_mock.pull.return_value = [b'{"status":"pull log"}'] + client_mock.pull.return_value = {"status": "pull log"} client_mock.wait.return_value = {"StatusCode": 0} client_class_mock.return_value = client_mock @@ -88,6 +88,8 @@ class DockerOperatorTestCase(unittest.TestCase): client_mock.pull.assert_called_with('ubuntu:latest', stream=True, decode=True) client_mock.wait.assert_called_with('some_id') + self.assertEqual(operator.cli.pull('ubuntu:latest', stream=True, decode=True), + client_mock.pull.return_value) @mock.patch('airflow.operators.docker_operator.tls.TLSConfig') @mock.patch('airflow.operators.docker_operator.APIClient') @@ -250,7 +252,7 @@ class DockerOperatorTestCase(unittest.TestCase): client_mock.images.return_value = [] client_mock.create_container.return_value = {'Id': 'some_id'} client_mock.attach.return_value = ['container log'] - client_mock.pull.return_value = [b'{"status":"pull log"}'] + client_mock.pull.return_value = {"status": "pull log"} client_mock.wait.return_value = {"StatusCode": 0} client_class_mock.return_value = client_mock