This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new 615aa18 [AIRFLOW-4363] Fix JSON encoding error (#8287)
615aa18 is described below
commit 615aa18859e529039a2116204be7a877cc493ee3
Author: retornam <[email protected]>
AuthorDate: Fri May 1 13:22:10 2020 -0700
[AIRFLOW-4363] Fix JSON encoding error (#8287)
(cherry picked from commit 511d98e30ded2bcce9d246b358f806cea45ebcb7)
---
airflow/operators/docker_operator.py | 10 ++++++----
tests/operators/test_docker_operator.py | 6 ++++--
2 files changed, 10 insertions(+), 6 deletions(-)
diff --git a/airflow/operators/docker_operator.py
b/airflow/operators/docker_operator.py
index 85c17d8..d0a872a 100644
--- a/airflow/operators/docker_operator.py
+++ b/airflow/operators/docker_operator.py
@@ -19,9 +19,10 @@
"""
Implements Docker operator
"""
-import json
import ast
+
+import six
from docker import APIClient, tls
from airflow.hooks.docker_hook import DockerHook
@@ -265,9 +266,10 @@ class DockerOperator(BaseOperator):
# Pull the docker image if `force_pull` is set or image does not exist
locally
if self.force_pull or len(self.cli.images(name=self.image)) == 0:
self.log.info('Pulling docker image %s', self.image)
- for line in self.cli.pull(self.image, stream=True, decode=True):
- output = json.loads(line.decode('utf-8').strip())
- if 'status' in output:
+ for output in self.cli.pull(self.image, stream=True, decode=True):
+ if isinstance(output, six.string_types):
+ self.log.info("%s", output)
+ if isinstance(output, dict) and 'status' in output:
self.log.info("%s", output['status'])
self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir
diff --git a/tests/operators/test_docker_operator.py
b/tests/operators/test_docker_operator.py
index ca4709a..aae5593 100644
--- a/tests/operators/test_docker_operator.py
+++ b/tests/operators/test_docker_operator.py
@@ -44,7 +44,7 @@ class DockerOperatorTestCase(unittest.TestCase):
client_mock.images.return_value = []
client_mock.attach.return_value = ['container log']
client_mock.logs.return_value = ['container log']
- client_mock.pull.return_value = [b'{"status":"pull log"}']
+ client_mock.pull.return_value = {"status": "pull log"}
client_mock.wait.return_value = {"StatusCode": 0}
client_class_mock.return_value = client_mock
@@ -88,6 +88,8 @@ class DockerOperatorTestCase(unittest.TestCase):
client_mock.pull.assert_called_with('ubuntu:latest', stream=True,
decode=True)
client_mock.wait.assert_called_with('some_id')
+ self.assertEqual(operator.cli.pull('ubuntu:latest', stream=True,
decode=True),
+ client_mock.pull.return_value)
@mock.patch('airflow.operators.docker_operator.tls.TLSConfig')
@mock.patch('airflow.operators.docker_operator.APIClient')
@@ -250,7 +252,7 @@ class DockerOperatorTestCase(unittest.TestCase):
client_mock.images.return_value = []
client_mock.create_container.return_value = {'Id': 'some_id'}
client_mock.attach.return_value = ['container log']
- client_mock.pull.return_value = [b'{"status":"pull log"}']
+ client_mock.pull.return_value = {"status": "pull log"}
client_mock.wait.return_value = {"StatusCode": 0}
client_class_mock.return_value = client_mock