[
https://issues.apache.org/jira/browse/AIRFLOW-3069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649749#comment-16649749
]
ASF GitHub Bot commented on AIRFLOW-3069:
-----------------------------------------
Fokko closed pull request #3914: [AIRFLOW-3069] Log all output of the S3 file
transform script
URL: https://github.com/apache/incubator-airflow/pull/3914
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/operators/s3_file_transform_operator.py
b/airflow/operators/s3_file_transform_operator.py
index da82fa952c..4a0b3ad2ee 100644
--- a/airflow/operators/s3_file_transform_operator.py
+++ b/airflow/operators/s3_file_transform_operator.py
@@ -19,6 +19,7 @@
from tempfile import NamedTemporaryFile
import subprocess
+import sys
from airflow.exceptions import AirflowException
from airflow.hooks.S3_hook import S3Hook
@@ -98,6 +99,7 @@ def __init__(
self.replace = replace
self.transform_script = transform_script
self.select_expression = select_expression
+ self.output_encoding = sys.getdefaultencoding()
def execute(self, context):
if self.transform_script is None and self.select_expression is None:
@@ -132,15 +134,23 @@ def execute(self, context):
f_source.flush()
if self.transform_script is not None:
- transform_script_process = subprocess.Popen(
+ process = subprocess.Popen(
[self.transform_script, f_source.name, f_dest.name],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
close_fds=True)
- (transform_script_stdoutdata, transform_script_stderrdata) = \
- transform_script_process.communicate()
- self.log.info("Transform script stdout %s",
transform_script_stdoutdata)
- if transform_script_process.returncode > 0:
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ close_fds=True
+ )
+
+ self.log.info("Output:")
+ for line in iter(process.stdout.readline, b''):
+ self.log.info(line.decode(self.output_encoding).rstrip())
+
+ process.wait()
+
+ if process.returncode > 0:
raise AirflowException(
- "Transform script failed %s",
transform_script_stderrdata)
+ "Transform script failed:
{0}".format(process.returncode)
+ )
else:
self.log.info(
"Transform script successful. Output temporarily
located at %s",
diff --git a/tests/operators/test_s3_file_transform_operator.py
b/tests/operators/test_s3_file_transform_operator.py
index ee788f9d26..da7a4b0bed 100644
--- a/tests/operators/test_s3_file_transform_operator.py
+++ b/tests/operators/test_s3_file_transform_operator.py
@@ -22,6 +22,7 @@
import io
import os
import shutil
+import sys
import unittest
from tempfile import mkdtemp
@@ -29,6 +30,7 @@
import mock
from moto import mock_s3
+from airflow.exceptions import AirflowException
from airflow.operators.s3_file_transform_operator import
S3FileTransformOperator
@@ -48,11 +50,15 @@ def tearDown(self):
raise e
@mock.patch('subprocess.Popen')
+ @mock.patch.object(S3FileTransformOperator, 'log')
@mock_s3
- def test_execute_with_transform_script(self, mock_Popen):
- transform_script_process = mock_Popen.return_value
- transform_script_process.communicate.return_value = [None, None]
- transform_script_process.returncode = 0
+ def test_execute_with_transform_script(self, mock_log, mock_Popen):
+ process_output = [b"Foo", b"Bar", b"Baz"]
+
+ process = mock_Popen.return_value
+ process.stdout.readline.side_effect = process_output
+ process.wait.return_value = None
+ process.returncode = 0
bucket = "bucket"
input_key = "foo"
@@ -72,6 +78,40 @@ def test_execute_with_transform_script(self, mock_Popen):
task_id="task_id")
t.execute(None)
+ mock_log.info.assert_has_calls([
+ mock.call(line.decode(sys.getdefaultencoding())) for line in
process_output
+ ])
+
+ @mock.patch('subprocess.Popen')
+ @mock_s3
+ def test_execute_with_failing_transform_script(self, mock_Popen):
+ process = mock_Popen.return_value
+ process.stdout.readline.side_effect = []
+ process.wait.return_value = None
+ process.returncode = 42
+
+ bucket = "bucket"
+ input_key = "foo"
+ output_key = "bar"
+ bio = io.BytesIO(b"input")
+
+ conn = boto3.client('s3')
+ conn.create_bucket(Bucket=bucket)
+ conn.upload_fileobj(Bucket=bucket, Key=input_key, Fileobj=bio)
+
+ s3_url = "s3://{0}/{1}"
+ t = S3FileTransformOperator(
+ source_s3_key=s3_url.format(bucket, input_key),
+ dest_s3_key=s3_url.format(bucket, output_key),
+ transform_script=self.transform_script,
+ replace=True,
+ task_id="task_id")
+
+ with self.assertRaises(AirflowException) as e:
+ t.execute(None)
+
+ self.assertEqual('Transform script failed: 42', str(e.exception))
+
@mock.patch('airflow.hooks.S3_hook.S3Hook.select_key',
return_value="input")
@mock_s3
def test_execute_with_select_expression(self, mock_select_key):
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Decode output of S3 file transform operator
> -------------------------------------------
>
> Key: AIRFLOW-3069
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3069
> Project: Apache Airflow
> Issue Type: Improvement
> Components: aws
> Affects Versions: 1.10.0
> Reporter: Szymon Bilinski
> Assignee: Szymon Bilinski
> Priority: Minor
> Fix For: 1.10.1
>
>
> h3. Current behaviour
> {{S3FileTransformOperator}} logs {{stdout}} of the underlying process as such:
> {code}
> [2018-09-15 23:17:13,850] {{s3_file_transform_operator.py:122}} INFO -
> Transform script stdout b'Copying /tmp/tmpd5rjo8g0 to
> /tmp/tmpd3vkhzte\nDone\n'
> {code}
> While {{stderr}} is omitted entirely, unless exit code is not {{0}} (in this
> case it's included in the exception message only).
> h3. Proposed behaviour
> 1. Both streams are logged, regardless of the underlying process outcome
> (i.e. success or failure).
> 2. Stream output is decoded before logging (e.g. {{\n}} is replaced with an
> actual new line).
> 3. If {{transform_script}} fails, the exception message contains return code
> of the process.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)