Fokko closed pull request #3914: [AIRFLOW-3069] Log all output of the S3 file
transform script
URL: https://github.com/apache/incubator-airflow/pull/3914
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/operators/s3_file_transform_operator.py
b/airflow/operators/s3_file_transform_operator.py
index da82fa952c..4a0b3ad2ee 100644
--- a/airflow/operators/s3_file_transform_operator.py
+++ b/airflow/operators/s3_file_transform_operator.py
@@ -19,6 +19,7 @@
from tempfile import NamedTemporaryFile
import subprocess
+import sys
from airflow.exceptions import AirflowException
from airflow.hooks.S3_hook import S3Hook
@@ -98,6 +99,7 @@ def __init__(
self.replace = replace
self.transform_script = transform_script
self.select_expression = select_expression
+ self.output_encoding = sys.getdefaultencoding()
def execute(self, context):
if self.transform_script is None and self.select_expression is None:
@@ -132,15 +134,23 @@ def execute(self, context):
f_source.flush()
if self.transform_script is not None:
- transform_script_process = subprocess.Popen(
+ process = subprocess.Popen(
[self.transform_script, f_source.name, f_dest.name],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
close_fds=True)
- (transform_script_stdoutdata, transform_script_stderrdata) = \
- transform_script_process.communicate()
- self.log.info("Transform script stdout %s",
transform_script_stdoutdata)
- if transform_script_process.returncode > 0:
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ close_fds=True
+ )
+
+ self.log.info("Output:")
+ for line in iter(process.stdout.readline, b''):
+ self.log.info(line.decode(self.output_encoding).rstrip())
+
+ process.wait()
+
+ if process.returncode > 0:
raise AirflowException(
- "Transform script failed %s",
transform_script_stderrdata)
+ "Transform script failed:
{0}".format(process.returncode)
+ )
else:
self.log.info(
"Transform script successful. Output temporarily
located at %s",
diff --git a/tests/operators/test_s3_file_transform_operator.py
b/tests/operators/test_s3_file_transform_operator.py
index ee788f9d26..da7a4b0bed 100644
--- a/tests/operators/test_s3_file_transform_operator.py
+++ b/tests/operators/test_s3_file_transform_operator.py
@@ -22,6 +22,7 @@
import io
import os
import shutil
+import sys
import unittest
from tempfile import mkdtemp
@@ -29,6 +30,7 @@
import mock
from moto import mock_s3
+from airflow.exceptions import AirflowException
from airflow.operators.s3_file_transform_operator import
S3FileTransformOperator
@@ -48,11 +50,15 @@ def tearDown(self):
raise e
@mock.patch('subprocess.Popen')
+ @mock.patch.object(S3FileTransformOperator, 'log')
@mock_s3
- def test_execute_with_transform_script(self, mock_Popen):
- transform_script_process = mock_Popen.return_value
- transform_script_process.communicate.return_value = [None, None]
- transform_script_process.returncode = 0
+ def test_execute_with_transform_script(self, mock_log, mock_Popen):
+ process_output = [b"Foo", b"Bar", b"Baz"]
+
+ process = mock_Popen.return_value
+ process.stdout.readline.side_effect = process_output
+ process.wait.return_value = None
+ process.returncode = 0
bucket = "bucket"
input_key = "foo"
@@ -72,6 +78,40 @@ def test_execute_with_transform_script(self, mock_Popen):
task_id="task_id")
t.execute(None)
+ mock_log.info.assert_has_calls([
+ mock.call(line.decode(sys.getdefaultencoding())) for line in
process_output
+ ])
+
+ @mock.patch('subprocess.Popen')
+ @mock_s3
+ def test_execute_with_failing_transform_script(self, mock_Popen):
+ process = mock_Popen.return_value
+ process.stdout.readline.side_effect = []
+ process.wait.return_value = None
+ process.returncode = 42
+
+ bucket = "bucket"
+ input_key = "foo"
+ output_key = "bar"
+ bio = io.BytesIO(b"input")
+
+ conn = boto3.client('s3')
+ conn.create_bucket(Bucket=bucket)
+ conn.upload_fileobj(Bucket=bucket, Key=input_key, Fileobj=bio)
+
+ s3_url = "s3://{0}/{1}"
+ t = S3FileTransformOperator(
+ source_s3_key=s3_url.format(bucket, input_key),
+ dest_s3_key=s3_url.format(bucket, output_key),
+ transform_script=self.transform_script,
+ replace=True,
+ task_id="task_id")
+
+ with self.assertRaises(AirflowException) as e:
+ t.execute(None)
+
+ self.assertEqual('Transform script failed: 42', str(e.exception))
+
@mock.patch('airflow.hooks.S3_hook.S3Hook.select_key',
return_value="input")
@mock_s3
def test_execute_with_select_expression(self, mock_select_key):
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services