[ 
https://issues.apache.org/jira/browse/AIRFLOW-3069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649749#comment-16649749
 ] 

ASF GitHub Bot commented on AIRFLOW-3069:
-----------------------------------------

Fokko closed pull request #3914: [AIRFLOW-3069] Log all output of the S3 file 
transform script
URL: https://github.com/apache/incubator-airflow/pull/3914
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/operators/s3_file_transform_operator.py 
b/airflow/operators/s3_file_transform_operator.py
index da82fa952c..4a0b3ad2ee 100644
--- a/airflow/operators/s3_file_transform_operator.py
+++ b/airflow/operators/s3_file_transform_operator.py
@@ -19,6 +19,7 @@
 
 from tempfile import NamedTemporaryFile
 import subprocess
+import sys
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.S3_hook import S3Hook
@@ -98,6 +99,7 @@ def __init__(
         self.replace = replace
         self.transform_script = transform_script
         self.select_expression = select_expression
+        self.output_encoding = sys.getdefaultencoding()
 
     def execute(self, context):
         if self.transform_script is None and self.select_expression is None:
@@ -132,15 +134,23 @@ def execute(self, context):
             f_source.flush()
 
             if self.transform_script is not None:
-                transform_script_process = subprocess.Popen(
+                process = subprocess.Popen(
                     [self.transform_script, f_source.name, f_dest.name],
-                    stdout=subprocess.PIPE, stderr=subprocess.PIPE, 
close_fds=True)
-                (transform_script_stdoutdata, transform_script_stderrdata) = \
-                    transform_script_process.communicate()
-                self.log.info("Transform script stdout %s", 
transform_script_stdoutdata)
-                if transform_script_process.returncode > 0:
+                    stdout=subprocess.PIPE,
+                    stderr=subprocess.STDOUT,
+                    close_fds=True
+                )
+
+                self.log.info("Output:")
+                for line in iter(process.stdout.readline, b''):
+                    self.log.info(line.decode(self.output_encoding).rstrip())
+
+                process.wait()
+
+                if process.returncode > 0:
                     raise AirflowException(
-                        "Transform script failed %s", 
transform_script_stderrdata)
+                        "Transform script failed: 
{0}".format(process.returncode)
+                    )
                 else:
                     self.log.info(
                         "Transform script successful. Output temporarily 
located at %s",
diff --git a/tests/operators/test_s3_file_transform_operator.py 
b/tests/operators/test_s3_file_transform_operator.py
index ee788f9d26..da7a4b0bed 100644
--- a/tests/operators/test_s3_file_transform_operator.py
+++ b/tests/operators/test_s3_file_transform_operator.py
@@ -22,6 +22,7 @@
 import io
 import os
 import shutil
+import sys
 import unittest
 from tempfile import mkdtemp
 
@@ -29,6 +30,7 @@
 import mock
 from moto import mock_s3
 
+from airflow.exceptions import AirflowException
 from airflow.operators.s3_file_transform_operator import 
S3FileTransformOperator
 
 
@@ -48,11 +50,15 @@ def tearDown(self):
                 raise e
 
     @mock.patch('subprocess.Popen')
+    @mock.patch.object(S3FileTransformOperator, 'log')
     @mock_s3
-    def test_execute_with_transform_script(self, mock_Popen):
-        transform_script_process = mock_Popen.return_value
-        transform_script_process.communicate.return_value = [None, None]
-        transform_script_process.returncode = 0
+    def test_execute_with_transform_script(self, mock_log, mock_Popen):
+        process_output = [b"Foo", b"Bar", b"Baz"]
+
+        process = mock_Popen.return_value
+        process.stdout.readline.side_effect = process_output
+        process.wait.return_value = None
+        process.returncode = 0
 
         bucket = "bucket"
         input_key = "foo"
@@ -72,6 +78,40 @@ def test_execute_with_transform_script(self, mock_Popen):
             task_id="task_id")
         t.execute(None)
 
+        mock_log.info.assert_has_calls([
+            mock.call(line.decode(sys.getdefaultencoding())) for line in 
process_output
+        ])
+
+    @mock.patch('subprocess.Popen')
+    @mock_s3
+    def test_execute_with_failing_transform_script(self, mock_Popen):
+        process = mock_Popen.return_value
+        process.stdout.readline.side_effect = []
+        process.wait.return_value = None
+        process.returncode = 42
+
+        bucket = "bucket"
+        input_key = "foo"
+        output_key = "bar"
+        bio = io.BytesIO(b"input")
+
+        conn = boto3.client('s3')
+        conn.create_bucket(Bucket=bucket)
+        conn.upload_fileobj(Bucket=bucket, Key=input_key, Fileobj=bio)
+
+        s3_url = "s3://{0}/{1}"
+        t = S3FileTransformOperator(
+            source_s3_key=s3_url.format(bucket, input_key),
+            dest_s3_key=s3_url.format(bucket, output_key),
+            transform_script=self.transform_script,
+            replace=True,
+            task_id="task_id")
+
+        with self.assertRaises(AirflowException) as e:
+            t.execute(None)
+
+        self.assertEqual('Transform script failed: 42', str(e.exception))
+
     @mock.patch('airflow.hooks.S3_hook.S3Hook.select_key', 
return_value="input")
     @mock_s3
     def test_execute_with_select_expression(self, mock_select_key):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Decode output of S3 file transform operator
> -------------------------------------------
>
>                 Key: AIRFLOW-3069
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3069
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: aws
>    Affects Versions: 1.10.0
>            Reporter: Szymon Bilinski
>            Assignee: Szymon Bilinski
>            Priority: Minor
>             Fix For: 1.10.1
>
>
> h3. Current behaviour
> {{S3FileTransformOperator}} logs {{stdout}} of the underlying process as such:
> {code}
> [2018-09-15 23:17:13,850] {{s3_file_transform_operator.py:122}} INFO - 
> Transform script stdout b'Copying /tmp/tmpd5rjo8g0 to 
> /tmp/tmpd3vkhzte\nDone\n'
> {code}
> While {{stderr}} is omitted entirely, unless exit code is not {{0}} (in this 
> case it's included in the exception message only).
> h3. Proposed behaviour
> 1. Both streams are logged, regardless of the underlying process outcome 
> (i.e. success or failure).
> 2. Stream output is decoded before logging (e.g. {{\n}} is replaced with an 
> actual new line). 
> 3. If {{transform_script}} fails, the exception message contains return code 
> of the process.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to